RabbitMQ 简介
消息队列中间件简介
消息队列中间件是分布式系统中重要的组件,$\color{red}{主要解决应用耦合,异步消息,流量削锋等问
题实现高性能,高可用,可伸缩和最终一致性[架构] }$使用较多的消息队列有 ActiveMQ,
RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ(基于Kafka,由阿里研发)
以下介绍消息队列在实际应用中常用的使用场景:$\color{red}{异步处理,应用解耦,流量削锋和消息通讯四个场景}$
什么是 RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,
为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开
发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、
高可用性等方面表现不俗。
具体特点包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如$\color{green}{持久化、传输确认、发布确认。}$
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已
经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑
定在一起,也通过插件机制实现自己的 Exchange 。
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如$\color{green}{ STOMP、MQTT 等等。}$
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件
架构图和主要概念
主要概念
RabbitMQ Server:
也叫 broker server,它是一种传输服务。 他的角色就是维护一条从
Producer 到 Consumer 的路线,保证数据能够按照指定的方式进行传输。
Producer:
消息生产者,如上图 A、B、C,数据的发送方。消息生产者连接 RabbitMQ 服务器
然后将消息投递到 Exchange。
Consumer:
消息消费者,如上图 1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ 将 Queue 中的消息发送到消息消费者。
Exchange:
生产者将消息发送到 Exchange(交换器),由 Exchange 将消息路由到一个或多
个 Queue 中(或者丢弃)。Exchange 并不存储消息。RabbitMQ 中的 Exchange 有$\color{red}{ direct(直连)、fanout(分裂)、
topic(主题)、headers(头) 四种类型,每种类型对应不同的路由规则。}$
Queue:(队列)
是 RabbitMQ 的内部对象,用于存储消息。消息消费者就是通过订阅队列
来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,生产者生产消息并最终投递到
Queue 中,消费者可以从 Queue 中获取消息并消费。多个消费者可以订阅同一个 Queue,
这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有
的消息并处理。(使用轮循的方式获得消息)
RoutingKey:
$\color{red}{生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定
这个消息的路由规则,而这个 routing key 需要与 Exchange Type 及 binding key 联合使用才能
最终生效。}$在 Exchange Type 与 binding key 固定的情况下(在正常使用时一般这些内容都是
固定配置好的),$\color{green}{我们的生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来
决定消息流向哪里。(就是一种匹配规则)}$RabbitMQ 为 routing key 设定的长度限制为 255bytes。
Connection:(连接)
Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server
的。以后我们可以看到,程序的起始处就是建立这个 TCP 连接。
Channels:(信道):
它建立在上述的 TCP 连接中。数据流动都是在 Channel 中进行的。也
就是说,$\color{red}{一般情况是程序起始建立 TCP 连接,第二步就是建立这个 Channel。}$
VirtualHost:
权限控制的基本单位,一个 VirtualHost 里面有若干 Exchange 和MessageQueue,以及指定被哪些 user 使用
RabbitMQ 安装与启动
window 安装
下载并安装 Eralng,以管理员身份运行安装)
下载并安装 rabbitmq,注意不要安装在包含中文和空格的目录下!安装后 window 服务中就存在 rabbitMQ 了,并且是启动状态。
安装管理界面(插件)
需要重启电脑,再重新启动服务
进入 rabbitMQ 安装目录的 sbin 目录,输入命令
1
|
rabbitmq‐plugins enable rabbitmq_management
|
打开浏览器,地址栏输入 http://127.0.0.1:15672 ,即可看到管理界面的登陆页
输入用户名和密码,都为 guest 进入主界面
最上侧的导航以此是:概览、连接、信道、交换器、队列、用户管理
docker 安装
下载镜像:
1
|
docker pull rabbitmq:management
|
创建容器,rabbitmq 需要有映射以下端口:
1
2
3
4
5
6
7
|
5671 5672 4369 15671 15672 25672
15672 (if management plugin is enabled)
15671 management 监听端口
5672, 5671 (AMQP 0-9-1 without and with TLS)
4369 (epmd) epmd 代表 Erlang 端口映射守护进程
25672 (Erlang distribution)
|
1
2
|
docker run -di --name=myRabbitmq -p 5671:5617 -p 5672:5672 -p
4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management
|
浏览器访问 http://192.168.2.2:15672/#/
直接模式(Direct)
什么是 Direct 模式
我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。
任何发送到 Direct Exchange 的消息都会被转发到 RouteKey 中指定的 Queue。
1.一般情况可以使用 rabbitMQ 自带的 Exchange:”"(该 Exchange 的名字为空字符串,下文称其为 default Exchange)。
2.这种模式下不需要将 Exchange 进行任何绑定(binding)操作
3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
4.如果 vhost 中不存在 RouteKey 中指定的队列名,则该消息会被抛弃。
创建队列
做下面的例子前,我们先建立一个叫 itcast 的队列。
Durability:是否做持久化 Durable(持久) transient(临时)
Auto delete : 是否自动删除
代码实现-消息生产者
导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件 application.yml
1
2
3
4
5
|
server:
port: 9204
spring:
rabbitmq:
host: 127.0.0.1
|
启动器
1
2
3
4
5
6
|
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
|
测试类===生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void directTest(){
rabbitTemplate.convertAndSend("JeremyDirect","直连模式");
}
}
|
消费者
1
2
3
4
5
6
7
8
9
10
11
|
@Component
@RabbitListener(queues = "JeremyDirect")
public class Consumer1 {
@RabbitHandler
public void sendMessage(String mes){
System.out.println("接受到消息:"+mes);
}
}
|
修改服务端口,同时运行多个程序,会发现RabbitMQ采用轮询的方式,进行消费,这样的好处,能够平均的获得消息,并不会随机,导致有的很多,有的基本没有
分列模式(Fanout)
当我们需要将消息一次发给多个队列时,需要使用这种模式。如下图:
任何发送到 Fanout Exchange 的消息都会被转发到与该 Exchange 绑定(Binding)的所有Queue 上。
1.可以理解为路由表的模式
2.这种模式不需要 RouteKey
3.这种模式需要提前将 Exchange 与 Queue 进行绑定,一个 Exchange 可以绑定多个Queue,一个 Queue 可以同多个 Exchange 进行绑定。
4.如果接受到消息的 Exchange 没有与任何 Queue 绑定,则消息会被抛弃 。
交换器绑定队列
创建多两个队列
创建一个交换器
交换器绑定队列
代码实现
1
2
3
4
5
6
7
|
/**
* 分列模式
*/
@Test
public void FanoutTest(){
rabbitTemplate.convertAndSend("JeremyFanout","","分列模式");
}
|
消费者1
1
2
3
4
5
6
7
8
9
10
11
12
|
*/
@Component
@RabbitListener(queues = "JeremyFanout")
public class ConsumerFanout {
@RabbitHandler
public void JeremyFanoutTest(String msg){
System.out.println("JeremyFanout:"+msg);
}
}
|
消费者2
1
2
3
4
5
6
7
8
9
10
|
@Component
@RabbitListener(queues = "JeremyFanout1")
public class ConsumerFanout1 {
@RabbitHandler
public void JeremyFanout1Test(String msg){
System.out.println("JeremyFanout1:"+msg);
}
}
|
主题模式(Topic)
任何发送到 Topic Exchange 的消息都会被转发到所有关心 RouteKey 中指定话题的 Queue 上
如上图所示
此类交换器使得来自不同的源头的消息可以到达一个对列,其实说的更明白一点就是模
糊匹配的意思。
例如:上图中红色对列的 routekey 为 usa.#,#代表匹配任意字符,但是要想消息能到达此对列,usa.必须匹配后面的#好可以随意。图中 usa.news
usa.weather,都能找到红色队列,符号 # 匹配一个或多个词。
符号 * 匹配不多不少一个词。因此 usa.# 能够匹配到 usa.news.XXX ,但是 usa.* 只会匹配到 usa.XXX 。
注:
交换器说到底是一个名称与队列绑定的列表。当消息发布到交换器时,实际上是由你所
连接的信道,将消息路由键同交换器上绑定的列表进行比较,最后路由消息。
任何发送到 Topic Exchange 的消息都会被转发到所有关心 RouteKey 中指定话题的Queue 上
-
这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一
个“标题”(RouteKey),Exchange 会将消息转发到所有关注主题能与 RouteKey 模糊匹配的
队列。
-
这种模式需要 RouteKey,也需要提前绑定 Exchange 与 Queue。
-
在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及
log 的消息(一个 RouteKey 为”MQ.log.error”的消息会被转发到该队列)。
“#”表示 0 个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,
无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
-
同样,如果 Exchange 没有发现能够与 RouteKey 匹配的 Queue,则会抛弃此消息
创建队列与绑定
新建一个交换器 ,类型选择 topic
添加匹配规则,添加后列表如下:
设置匹配主题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/**
* 主题模式1
*/
@Test
public void TopicTest(){
rabbitTemplate.convertAndSend("JeremyTopic","Jeremy.SZE","匹配单个主题模式");
}
/**
* 主题二
*/
@Test
public void TopicTest1(){
rabbitTemplate.convertAndSend("JeremyTopic","123.log","匹配多个主题模式");
}
@Test
public void TopicTest2(){
rabbitTemplate.convertAndSend("JeremyTopic","Jeremy.log","匹配单个主题模式");
}
|
消费者
1
2
3
4
5
6
7
8
9
10
|
@Component
@RabbitListener(queues = "JeremyFanout")
public class ConsumerTopic1 {
@RabbitHandler
public void JeremyFanoutTest(String msg){
System.out.println("匹配单个主题模式:"+msg);
}
}
|
使用阿里云大于发送短信
阿里大于简介
阿里大于是阿里云旗下产品,融合了三大运营商的通信能力,通过将传统通信业务和能力与互联网相结合,创新融合阿里巴巴生态内容,全力为中小企业和开发者提供优质服务阿里大于提供包括短信、语音、流量直充、私密专线、店铺手机号等个性化服务。通过阿里大于打通三大运营商通信能力,全面融合阿里巴巴生态,以开放API及SDK的方式向开发者提供通信和数据服务,更好地支撑企业业务发展和创新服务。
准备工作
注册账户
首先我们先进入“阿里大于”www.alidayu.com(https://dayu.aliyun.com/)
注册账号后,再在手机下载“阿里云”应用,登录,然后进行在线实名认证。
点击进入控制台:
点击国内消息
申请模板
创建accesskeys
下载SDK及DEMO
下载地址:https://help.aliyun.com/document_detail/55359.html
1
2
3
4
5
6
7
8
9
10
11
|
/**
* 生成验证码
* @param mobile
* @return
*/
@RequestMapping(value = "/sendsms/{mobile}",method = RequestMethod.POST)
public Result saveCodeAndMobile(@PathVariable String mobile){
userService.saveCodeAndMobile(mobile);
return new Result(true,StatusCode.OK,"保存验证码和手机号成功");
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/**
* 添加手机号和验证码到MQ及Redis
*/
public void saveCodeAndMobile(String mobile){
//生成验证码
String randomCode = RandomStringUtils.randomNumeric(6);
redisTemplate.opsForValue().set("smsCode_"+mobile,randomCode,5, TimeUnit.MINUTES);
Map<String,Object> map=new HashMap<String, Object>();
map.put("code",randomCode);
map.put("mobile",mobile);
rabbitTemplate.convertAndSend("sms",map);
}
|
注册用户
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/**
* 注册
* @param user
* @param code
*/
@RequestMapping(value = "/register/{code}",method = RequestMethod.POST)
public Result register(@RequestBody User user, @PathVariable String code){
if (code==null||"".equals(code)){
return new Result(false,StatusCode.ERROR,"请输入验证码");
}
String rediscode=(String) redisTemplate.opsForValue().get("smsCode_"+user.getMobile());
if (rediscode==null || !rediscode.equals(code)){
return new Result(false,StatusCode.ERROR,"您输入的验证码不对");
}
userService.register(user);
return new Result(true,StatusCode.OK,"注册成功");
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/**
* 注册
* @param user
*
*/
public void register(User user){
user.setId(idWorker.nextId()+"");
user.setRegdate(new Date());
user.setFanscount(0);
user.setFollowcount(0);
userDao.save(user);
}
|
短信监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
package com.tensquare.sms.listener;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
import com.rabbitmq.client.Channel;
import com.tensquare.sms.until.SmsUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 短信监听
*/
@Component
@RabbitListener(queues = "sms")
public class SmsListener {
@Autowired
private SmsUtil smsUtil;
@Value("${aliyun.sms.temp_code}")
private String tempCode;
@Value("${aliyun.sms.sign_name}")
private String signName;
/**
* 处理方法
*/
@RabbitHandler
public void handleMsg(Map<String,String> msg, Channel channel, Message message){
System.out.println("手机号码:"+msg.get("mobile"));
System.out.println("验证码:"+msg.get("code"));
//使用阿里云短信发送手机验证码
try {
SendSmsResponse sendSmsResponse = smsUtil.sendSms(msg.get("mobile"), tempCode, signName, "{\"code\":\"" + msg.get("code") + "\"}");
if(sendSmsResponse.getCode().equals("OK")){
System.out.println("短信发送成功");
//手动发送确认消息给MQ,让MQ删除对应的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
System.out.println("短信发送失败:"+sendSmsResponse.getCode());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
短信工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package com.tensquare.sms.until;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsRequest;
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsResponse;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 短信工具类
* @author Administrator
*
*/
@Component
public class SmsUtil {
//产品名称:云通信短信API产品,开发者无需替换
static final String product = "Dysmsapi";
//产品域名,开发者无需替换
static final String domain = "dysmsapi.aliyuncs.com";
@Autowired
private Environment env;
// TODO 此处需要替换成开发者自己的AK(在阿里云访问控制台寻找)
/**
* 发送短信
* @param mobile 手机号
* @param template_code 模板号
* @param sign_name 签名
* @param param 参数
* @return
* @throws ClientException
*/
public SendSmsResponse sendSms(String mobile,String template_code,String sign_name,String param) throws ClientException {
String accessKeyId =env.getProperty("aliyun.sms.accessKeyId");
String accessKeySecret = env.getProperty("aliyun.sms.accessKeySecret");
//可自助调整超时时间
System.setProperty("sun.net.client.defaultConnectTimeout", "10000");
System.setProperty("sun.net.client.defaultReadTimeout", "10000");
//初始化acsClient,暂不支持region化
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
DefaultProfile.addEndpoint("cn-hangzhou", "cn-hangzhou", product, domain);
IAcsClient acsClient = new DefaultAcsClient(profile);
//组装请求对象-具体描述见控制台-文档部分内容
SendSmsRequest request = new SendSmsRequest();
//必填:待发送手机号
request.setPhoneNumbers(mobile);
//必填:短信签名-可在短信控制台中找到
request.setSignName(sign_name);
//必填:短信模板-可在短信控制台中找到
request.setTemplateCode(template_code);
//可选:模板中的变量替换JSON串,如模板内容为"亲爱的${name},您的验证码为${code}"时,此处的值为
request.setTemplateParam(param);
//选填-上行短信扩展码(无特殊需求用户请忽略此字段)
//request.setSmsUpExtendCode("90997");
//可选:outId为提供给业务方扩展字段,最终在短信回执消息中将此值带回给调用者
request.setOutId("yourOutId");
//hint 此处可能会抛出异常,注意catch
SendSmsResponse sendSmsResponse = acsClient.getAcsResponse(request);
return sendSmsResponse;
}
public QuerySendDetailsResponse querySendDetails(String mobile,String bizId) throws ClientException {
String accessKeyId =env.getProperty("accessKeyId");
String accessKeySecret = env.getProperty("accessKeySecret");
//可自助调整超时时间
System.setProperty("sun.net.client.defaultConnectTimeout", "10000");
System.setProperty("sun.net.client.defaultReadTimeout", "10000");
//初始化acsClient,暂不支持region化
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
DefaultProfile.addEndpoint("cn-hangzhou", "cn-hangzhou", product, domain);
IAcsClient acsClient = new DefaultAcsClient(profile);
//组装请求对象
QuerySendDetailsRequest request = new QuerySendDetailsRequest();
//必填-号码
request.setPhoneNumber(mobile);
//可选-流水号
request.setBizId(bizId);
//必填-发送日期 支持30天内记录查询,格式yyyyMMdd
SimpleDateFormat ft = new SimpleDateFormat("yyyyMMdd");
request.setSendDate(ft.format(new Date()));
//必填-页大小
request.setPageSize(10L);
//必填-当前页码从1开始计数
request.setCurrentPage(1L);
//hint 此处可能会抛出异常,注意catch
QuerySendDetailsResponse querySendDetailsResponse = acsClient.getAcsResponse(request);
return querySendDetailsResponse;
}
}
|