1.7 整合RabbitMQ

1.7.1 RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
在开发过程中,我们需要了解RabbitMQ的消息交换类型(Exchange 类型):

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。只说前三种模式。

  • (1) Direct模式

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配

  • (2) Topic模式

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配一个单词。

  • (3) Fanout模式

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

1.7.2 配置工程

Pom配置

1
2
3
4
5

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13

application-dev.properties 配置
#================== RabbitMq ===================#
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

#================== RabbitMq 队列配置 ===================#
mq.env=local
basic.info.mq.exchange.name=${mq.env}:sys:info:mq:exchange
basic.info.mq.routing.key.name=${mq.env}:sys:info:mq:routing:key
basic.info.mq.queue.name=${mq.env}:sys:info:mq:queue

创建RabbitmqConfig,在配置类中创建队列、交换机、路由及其绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

package com.zone7.demo.helloworld.config.rabbitmq;


import com.zone7.demo.helloworld.sys.service.impl.CustomerMqServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

/**
* RabbitMq配置类
*
*/
@Configuration
public class RabbitMqConfig {

private static final Logger log= LoggerFactory.getLogger(RabbitMqConfig.class);

@Autowired
private Environment env;

@Autowired
private CachingConnectionFactory connectionFactory;

@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
return factory;
}

/**
* 多个消费者
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}


//TODO:基本消息模型构建

@Bean
public DirectExchange basicExchange(){
return new DirectExchange(env.getProperty("basic.info.mq.exchange.name"), true,false);
}

@Bean(name = "basicQueue")
public Queue basicQueue(){
return new Queue(env.getProperty("basic.info.mq.queue.name"), true);
}

@Bean
public Binding basicBinding(){
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("basic.info.mq.routing.key.name"));
}

}

1.7.3 案例开发

  • (1) 创建服务层代码
    首先开发一个带有消息接收监听功能和消息发送功能的服务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74

    package com.zone7.demo.helloworld.sys.service.impl;

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.zone7.demo.helloworld.sys.service.RabbitService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.env.Environment;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Service;

    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;

    /**
    * 消息监听器以及发送服务
    */
    @Service
    public class RabbitServiceImpl implements RabbitService {
    private static final Logger log= LoggerFactory.getLogger(RabbitServiceImpl.class);

    @Autowired
    private Environment env;

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
    * 消息消费
    * @param message
    */
    @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
    try {
    //TODO:接收String
    String result=new String(message,"UTF-8");
    log.info("接收String消息: {} ",result);
    }catch (Exception e){
    log.error("监听消费消息 发生异常: ",e.fillInStackTrace());
    }
    }

    /**
    * 消息发送
    * @param message
    */
    @Override
    public void sendMessage(String message) {
    try {
    log.info("待发送的消息: {} ",message);

    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
    rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));

    Message msg= MessageBuilder.withBody(message.getBytes(Charset.forName("UTF-8"))).build();
    rabbitTemplate.convertAndSend(msg);

    }catch (Exception e){
    log.error("发送简单消息发生异常: ",e.fillInStackTrace());
    }
    }


    }
  • (2) 创建控制层代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28

    package com.zone7.demo.helloworld.sys.controller;

    import com.zone7.demo.helloworld.commons.response.ResponseData;
    import com.zone7.demo.helloworld.sys.service.RabbitService;
    import com.zone7.demo.helloworld.sys.service.RedisService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;

    /**
    * @Author: zone7
    * @Date: 2019/06/17
    * @Version 1.0
    */
    @RestController
    @RequestMapping("/rabbit")
    public class RabbitController {
    @Autowired
    private RabbitService rabbitService;

    @GetMapping("/send/{message}")
    public ResponseData send(@PathVariable String message ){
    rabbitService.sendMessage(message);

    return ResponseData.successMessage("发送消息: "+message+"成功");
    }

    }
  • (3) 测试效果
    确保Rabbit已经启动,启动工程后在浏览器中输入
    http://localhost:8080/rabbit/send/hello
    我们将会看到以下信息:
    rabbit1
    同时查看控制台输出,确认消息发送成功并已经被消费。

    rabbit2
    rabbit3
    通过rabbit监控网页http://localhost:15672/ 我们可看到队列、交换机、路由等信息。
    rabbit4