RabbitMQ 简介

消息队列中间件简介

  消息队列中间件是分布式系统中重要的组件,$\color{red}{主要解决应用耦合,异步消息,流量削锋等问 题实现高性能,高可用,可伸缩和最终一致性[架构] }$使用较多的消息队列有 ActiveMQ, RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ(基于Kafka,由阿里研发) 以下介绍消息队列在实际应用中常用的使用场景:$\color{red}{异步处理,应用解耦,流量削锋和消息通讯四个场景}$

什么是 RabbitMQ

  RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准, 为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开 发语言等条件的限制。
  RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、 高可用性等方面表现不俗。

具体特点包括:

可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如$\color{green}{持久化、传输确认、发布确认。}$

灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已 经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑 定在一起,也通过插件机制实现自己的 Exchange 。

消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如$\color{green}{ STOMP、MQTT 等等。}$

多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

插件机制(Plugin System)

RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件

架构图和主要概念

alt rabbitMQ架构图

主要概念

RabbitMQ Server:

也叫 broker server,它是一种传输服务。 他的角色就是维护一条从 Producer 到 Consumer 的路线,保证数据能够按照指定的方式进行传输。

Producer:

消息生产者,如上图 A、B、C,数据的发送方。消息生产者连接 RabbitMQ 服务器 然后将消息投递到 Exchange。

Consumer:

消息消费者,如上图 1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ 将 Queue 中的消息发送到消息消费者。

Exchange:

生产者将消息发送到 Exchange(交换器),由 Exchange 将消息路由到一个或多 个 Queue 中(或者丢弃)。Exchange 并不存储消息。RabbitMQ 中的 Exchange 有$\color{red}{ direct(直连)、fanout(分裂)、 topic(主题)、headers(头) 四种类型,每种类型对应不同的路由规则。}$

Queue:(队列)

是 RabbitMQ 的内部对象,用于存储消息。消息消费者就是通过订阅队列 来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,生产者生产消息并最终投递到 Queue 中,消费者可以从 Queue 中获取消息并消费。多个消费者可以订阅同一个 Queue, 这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有 的消息并处理。(使用轮循的方式获得消息)

RoutingKey:

$\color{red}{生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定 这个消息的路由规则,而这个 routing key 需要与 Exchange Type 及 binding key 联合使用才能 最终生效。}$在 Exchange Type 与 binding key 固定的情况下(在正常使用时一般这些内容都是 固定配置好的),$\color{green}{我们的生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来 决定消息流向哪里。(就是一种匹配规则)}$RabbitMQ 为 routing key 设定的长度限制为 255bytes。

Connection:(连接)

Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server 的。以后我们可以看到,程序的起始处就是建立这个 TCP 连接。

Channels:(信道):

它建立在上述的 TCP 连接中。数据流动都是在 Channel 中进行的。也 就是说,$\color{red}{一般情况是程序起始建立 TCP 连接,第二步就是建立这个 Channel。}$

VirtualHost:

权限控制的基本单位,一个 VirtualHost 里面有若干 Exchange 和MessageQueue,以及指定被哪些 user 使用

RabbitMQ 安装与启动

window 安装

下载并安装 Eralng,以管理员身份运行安装)
下载并安装 rabbitmq,注意不要安装在包含中文和空格的目录下!安装后 window 服务中就存在 rabbitMQ 了,并且是启动状态。
安装管理界面(插件)
需要重启电脑,再重新启动服务
进入 rabbitMQ 安装目录的 sbin 目录,输入命令

1
rabbitmq‐plugins enable rabbitmq_management  

打开浏览器,地址栏输入 http://127.0.0.1:15672 ,即可看到管理界面的登陆页 alt rabbitMQ服务端UI 输入用户名和密码,都为 guest 进入主界面
alt rabbitMQ服务端UI1 最上侧的导航以此是:概览、连接、信道、交换器、队列、用户管理

docker 安装

下载镜像:

1
docker pull rabbitmq:management

创建容器,rabbitmq 需要有映射以下端口:

1
2
3
4
5
6
7
5671 5672 4369 15671 15672 25672

 15672 (if management plugin is enabled)
 15671 management 监听端口
 5672, 5671 (AMQP 0-9-1 without and with TLS)
 4369 (epmd) epmd 代表 Erlang 端口映射守护进程
 25672 (Erlang distribution)
1
2
docker run -di --name=myRabbitmq -p 5671:5617 -p 5672:5672 -p
4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management

浏览器访问 http://192.168.2.2:15672/#/

直接模式(Direct)

什么是 Direct 模式

我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。 alt 直连模式 任何发送到 Direct Exchange 的消息都会被转发到 RouteKey 中指定的 Queue。
1.一般情况可以使用 rabbitMQ 自带的 Exchange:”"(该 Exchange 的名字为空字符串,下文称其为 default Exchange)。
2.这种模式下不需要将 Exchange 进行任何绑定(binding)操作
3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
4.如果 vhost 中不存在 RouteKey 中指定的队列名,则该消息会被抛弃。

创建队列

做下面的例子前,我们先建立一个叫 itcast 的队列。 alt 创建队列 Durability:是否做持久化 Durable(持久) transient(临时)
Auto delete : 是否自动删除

代码实现-消息生产者

导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/>
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置文件 application.yml

1
2
3
4
5
server:
  port: 9204
spring:
  rabbitmq:
    host: 127.0.0.1

启动器

1
2
3
4
5
6
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }
}

测试类===生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqTest {

    @Autowired
private RabbitTemplate rabbitTemplate;
    @Test
    public void directTest(){

        rabbitTemplate.convertAndSend("JeremyDirect","直连模式");

    }
}

消费者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Component
@RabbitListener(queues = "JeremyDirect")
public class Consumer1 {

    @RabbitHandler
    public void sendMessage(String mes){

        System.out.println("接受到消息:"+mes);
    }

}

修改服务端口,同时运行多个程序,会发现RabbitMQ采用轮询的方式,进行消费,这样的好处,能够平均的获得消息,并不会随机,导致有的很多,有的基本没有

分列模式(Fanout)

当我们需要将消息一次发给多个队列时,需要使用这种模式。如下图: alt RabbitMQ分裂模式 任何发送到 Fanout Exchange 的消息都会被转发到与该 Exchange 绑定(Binding)的所有Queue 上。 1.可以理解为路由表的模式
2.这种模式不需要 RouteKey
3.这种模式需要提前将 Exchange 与 Queue 进行绑定,一个 Exchange 可以绑定多个Queue,一个 Queue 可以同多个 Exchange 进行绑定。
4.如果接受到消息的 Exchange 没有与任何 Queue 绑定,则消息会被抛弃 。

交换器绑定队列

创建多两个队列 alt 创建队列1 创建一个交换器 alt 创建多一个交换器 交换器绑定队列 alt 交换器绑定队列 代码实现

1
2
3
4
5
6
7
 /**
     * 分列模式
     */
    @Test
    public void FanoutTest(){
        rabbitTemplate.convertAndSend("JeremyFanout","","分列模式");
    }

消费者1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 */
@Component
@RabbitListener(queues = "JeremyFanout")
public class ConsumerFanout {


    @RabbitHandler
    public void JeremyFanoutTest(String msg){

        System.out.println("JeremyFanout:"+msg);
    }
}

消费者2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Component
@RabbitListener(queues = "JeremyFanout1")
public class ConsumerFanout1 {

    @RabbitHandler
    public void JeremyFanout1Test(String msg){

        System.out.println("JeremyFanout1:"+msg);
    }
}

主题模式(Topic)

任何发送到 Topic Exchange 的消息都会被转发到所有关心 RouteKey 中指定话题的 Queue 上 alt 主题模式 如上图所示
  此类交换器使得来自不同的源头的消息可以到达一个对列,其实说的更明白一点就是模 糊匹配的意思。
例如:上图中红色对列的 routekey 为 usa.#,#代表匹配任意字符,但是要想消息能到达此对列,usa.必须匹配后面的#好可以随意。图中 usa.news usa.weather,都能找到红色队列,符号 # 匹配一个或多个词。
符号 * 匹配不多不少一个词。因此 usa.# 能够匹配到 usa.news.XXX ,但是 usa.* 只会匹配到 usa.XXX 。
注:
  交换器说到底是一个名称与队列绑定的列表。当消息发布到交换器时,实际上是由你所 连接的信道,将消息路由键同交换器上绑定的列表进行比较,最后路由消息。
  任何发送到 Topic Exchange 的消息都会被转发到所有关心 RouteKey 中指定话题的Queue 上

  1. 这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一 个“标题”(RouteKey),Exchange 会将消息转发到所有关注主题能与 RouteKey 模糊匹配的 队列。

  2. 这种模式需要 RouteKey,也需要提前绑定 Exchange 与 Queue。

  3. 在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及 log 的消息(一个 RouteKey 为”MQ.log.error”的消息会被转发到该队列)。 “#”表示 0 个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配, 无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

  4. 同样,如果 Exchange 没有发现能够与 RouteKey 匹配的 Queue,则会抛弃此消息

创建队列与绑定

新建一个交换器 ,类型选择 topic alt 主题模式1 添加匹配规则,添加后列表如下: alt 添加主题匹配规则 alt 添加主题规则后 设置匹配主题

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
  /**
     * 主题模式1
     */
    @Test
    public void TopicTest(){

        rabbitTemplate.convertAndSend("JeremyTopic","Jeremy.SZE","匹配单个主题模式");
    }

    /**
     * 主题二
     */
    @Test
    public void  TopicTest1(){
        rabbitTemplate.convertAndSend("JeremyTopic","123.log","匹配多个主题模式");
    }

    @Test
    public void  TopicTest2(){
        rabbitTemplate.convertAndSend("JeremyTopic","Jeremy.log","匹配单个主题模式");
    }

消费者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Component
@RabbitListener(queues = "JeremyFanout")
public class ConsumerTopic1 {

    @RabbitHandler
    public void JeremyFanoutTest(String msg){

        System.out.println("匹配单个主题模式:"+msg);
    }
}

使用阿里云大于发送短信

阿里大于简介

  阿里大于是阿里云旗下产品,融合了三大运营商的通信能力,通过将传统通信业务和能力与互联网相结合,创新融合阿里巴巴生态内容,全力为中小企业和开发者提供优质服务阿里大于提供包括短信、语音、流量直充、私密专线、店铺手机号等个性化服务。通过阿里大于打通三大运营商通信能力,全面融合阿里巴巴生态,以开放API及SDK的方式向开发者提供通信和数据服务,更好地支撑企业业务发展和创新服务。

准备工作

注册账户

首先我们先进入“阿里大于”www.alidayu.com(https://dayu.aliyun.com/)
注册账号后,再在手机下载“阿里云”应用,登录,然后进行在线实名认证。
alt 点击管理后台 点击进入控制台: alt 点击短信服务 点击国内消息 alt 点击国内消息 alt 添加签名 申请模板 alt 申请模板 alt 提交申请模板

创建accesskeys

alt 点击AccessKey alt 继续使用 alt 创建Accesskey

下载SDK及DEMO

alt 下载SDKDEMO 下载地址:https://help.aliyun.com/document_detail/55359.html

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  /**
     * 生成验证码
     * @param mobile
     * @return
     */
    @RequestMapping(value = "/sendsms/{mobile}",method = RequestMethod.POST)
    public Result saveCodeAndMobile(@PathVariable String mobile){

        userService.saveCodeAndMobile(mobile);
        return new Result(true,StatusCode.OK,"保存验证码和手机号成功");
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
 /**
     * 添加手机号和验证码到MQ及Redis
     */
    public void saveCodeAndMobile(String mobile){

        //生成验证码
        String randomCode = RandomStringUtils.randomNumeric(6);

        redisTemplate.opsForValue().set("smsCode_"+mobile,randomCode,5, TimeUnit.MINUTES);
        Map<String,Object> map=new HashMap<String, Object>();
        map.put("code",randomCode);
        map.put("mobile",mobile);
        rabbitTemplate.convertAndSend("sms",map);
    }

注册用户

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
 /**
     * 注册
     * @param user
     * @param code
     */
    @RequestMapping(value = "/register/{code}",method = RequestMethod.POST)
    public Result register(@RequestBody User user, @PathVariable String code){
         if (code==null||"".equals(code)){
        return new Result(false,StatusCode.ERROR,"请输入验证码");

         }
        String rediscode=(String) redisTemplate.opsForValue().get("smsCode_"+user.getMobile());
         if (rediscode==null || !rediscode.equals(code)){

        return new Result(false,StatusCode.ERROR,"您输入的验证码不对");
         }

           userService.register(user);

        return new Result(true,StatusCode.OK,"注册成功");
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

    /**
     * 注册
     * @param user
     *
     */
    public void  register(User user){

        user.setId(idWorker.nextId()+"");
        user.setRegdate(new Date());
        user.setFanscount(0);
        user.setFollowcount(0);
        userDao.save(user);
    }

短信监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.tensquare.sms.listener;

import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
import com.rabbitmq.client.Channel;
import com.tensquare.sms.until.SmsUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Map;
/**
 * 短信监听
 */
@Component
@RabbitListener(queues = "sms")
public class SmsListener {

    @Autowired
    private SmsUtil smsUtil;

    @Value("${aliyun.sms.temp_code}")
    private String tempCode;

    @Value("${aliyun.sms.sign_name}")
    private String signName;

    /**
     * 处理方法
     */
    @RabbitHandler
    public void handleMsg(Map<String,String> msg, Channel channel, Message message){
        System.out.println("手机号码:"+msg.get("mobile"));
        System.out.println("验证码:"+msg.get("code"));

        //使用阿里云短信发送手机验证码
        try {
            SendSmsResponse sendSmsResponse = smsUtil.sendSms(msg.get("mobile"), tempCode, signName, "{\"code\":\"" + msg.get("code") + "\"}");

            if(sendSmsResponse.getCode().equals("OK")){
                System.out.println("短信发送成功");
                //手动发送确认消息给MQ,让MQ删除对应的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }else{
                System.out.println("短信发送失败:"+sendSmsResponse.getCode());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

短信工具类

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
package com.tensquare.sms.until;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsRequest;
import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsResponse;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
/**
 * 短信工具类
 * @author Administrator
 *
 */
@Component
public class SmsUtil {

    //产品名称:云通信短信API产品,开发者无需替换
    static final String product = "Dysmsapi";
    //产品域名,开发者无需替换
    static final String domain = "dysmsapi.aliyuncs.com";

    @Autowired
    private Environment env;

    // TODO 此处需要替换成开发者自己的AK(在阿里云访问控制台寻找)

    /**
     * 发送短信
     * @param mobile 手机号
     * @param template_code 模板号
     * @param sign_name 签名
     * @param param 参数
     * @return
     * @throws ClientException
     */
    public SendSmsResponse sendSms(String mobile,String template_code,String sign_name,String param) throws ClientException {
        String accessKeyId =env.getProperty("aliyun.sms.accessKeyId");
        String accessKeySecret = env.getProperty("aliyun.sms.accessKeySecret");
        //可自助调整超时时间
        System.setProperty("sun.net.client.defaultConnectTimeout", "10000");
        System.setProperty("sun.net.client.defaultReadTimeout", "10000");
        //初始化acsClient,暂不支持region化
        IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
        DefaultProfile.addEndpoint("cn-hangzhou", "cn-hangzhou", product, domain);
        IAcsClient acsClient = new DefaultAcsClient(profile);
        //组装请求对象-具体描述见控制台-文档部分内容
        SendSmsRequest request = new SendSmsRequest();
        //必填:待发送手机号
        request.setPhoneNumbers(mobile);
        //必填:短信签名-可在短信控制台中找到
        request.setSignName(sign_name);
        //必填:短信模板-可在短信控制台中找到
        request.setTemplateCode(template_code);
        //可选:模板中的变量替换JSON串,如模板内容为"亲爱的${name},您的验证码为${code}"时,此处的值为
        request.setTemplateParam(param);
        //选填-上行短信扩展码(无特殊需求用户请忽略此字段)
        //request.setSmsUpExtendCode("90997");
        //可选:outId为提供给业务方扩展字段,最终在短信回执消息中将此值带回给调用者
        request.setOutId("yourOutId");
        //hint 此处可能会抛出异常,注意catch
        SendSmsResponse sendSmsResponse = acsClient.getAcsResponse(request);
        return sendSmsResponse;
    }

    public  QuerySendDetailsResponse querySendDetails(String mobile,String bizId) throws ClientException {
        String accessKeyId =env.getProperty("accessKeyId");
        String accessKeySecret = env.getProperty("accessKeySecret");
        //可自助调整超时时间
        System.setProperty("sun.net.client.defaultConnectTimeout", "10000");
        System.setProperty("sun.net.client.defaultReadTimeout", "10000");
        //初始化acsClient,暂不支持region化
        IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
        DefaultProfile.addEndpoint("cn-hangzhou", "cn-hangzhou", product, domain);
        IAcsClient acsClient = new DefaultAcsClient(profile);
        //组装请求对象
        QuerySendDetailsRequest request = new QuerySendDetailsRequest();
        //必填-号码
        request.setPhoneNumber(mobile);
        //可选-流水号
        request.setBizId(bizId);
        //必填-发送日期 支持30天内记录查询,格式yyyyMMdd
        SimpleDateFormat ft = new SimpleDateFormat("yyyyMMdd");
        request.setSendDate(ft.format(new Date()));
        //必填-页大小
        request.setPageSize(10L);
        //必填-当前页码从1开始计数
        request.setCurrentPage(1L);
        //hint 此处可能会抛出异常,注意catch
        QuerySendDetailsResponse querySendDetailsResponse = acsClient.getAcsResponse(request);
        return querySendDetailsResponse;
    }
}