简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,
如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。
用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

个人想到最常用的高并发场景可能是秒杀,这里自己做了一个简单的demo,进行了实践。

项目地址: rabbitmq秒杀实践

第一步: 新建springboot项目,引入依赖,增加库表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<!--消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>

这里springboot的版本为2.2.0,持久层选择mybatis-plus,消息队列配合redis实现高并发场景下的削峰,主要是redis有优秀的读写速度,可以快速处理请求,将请求结果处理后,返回用户,通过消息队列暂存结果,监听队列,程序持久化结果到数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
Navicat MariaDB Data Transfer

Source Server : localhost
Source Server Version : 100214
Source Host : localhost:3308
Source Database : seconds_kill

Target Server Type : MariaDB
Target Server Version : 100214
File Encoding : 65001

Date: 2019-12-26 18:30:26
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for goods
-- ----------------------------
DROP TABLE IF EXISTS `goods`;
CREATE TABLE `goods` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`goods_name` varchar(255) NOT NULL COMMENT '商品名称',
`store` int(11) NOT NULL COMMENT '价格',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of goods
-- ----------------------------
INSERT INTO `goods` VALUES ('1', 'watch', '150');
INSERT INTO `goods` VALUES ('2', 'pencil', '150');

-- ----------------------------
-- Table structure for orders
-- ----------------------------
DROP TABLE IF EXISTS `orders`;
CREATE TABLE `orders` (
`id` varchar(255) NOT NULL,
`order_name` varchar(255) NOT NULL,
`order_user` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=36956 DEFAULT CHARSET=utf8mb4;

orders订单表id,考虑到实际业务,订单id必须唯一,使用mybatis-plus的分布式全局唯一id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.sonake.seconds.kill.demo.domain;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

/**
* @author :xzyuan
* @date :Created in 2019/12/26 13:42
* @description
* @version:
*/
@TableName("orders")
@Data
public class Orders {
@TableId(type = IdType.ID_WORKER_STR)
private String id;
private String orderName;
private String orderUser;
}

第二步:添加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
spring:
application:
name: seconds_kill
datasource:
url: jdbc:mysql://127.0.0.1:3308/seconds_kill?rewriteBatchedStatements=true&serverTimezone=GMT%2b8
password: root
username: root
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
connection-timeout: 30000
max-lifetime: 1800000
max-pool-size: 15
min-idle: 200
connection-test-query: select 1
rabbitmq:
virtual-host: /
host: localhost
username: guest
password: guest
listener:
simple:
retry:
enabled: true
max-attempts: 10 #最大重试次数
max-interval: 10000 #重试最大时间间隔(ms)
initial-interval: #c重试间隔时间(ms)
multiplier: 5 #应用于前一重试间隔的乘法器。

redis:
host: localhost
port: 6379
jedis:
pool:
max-active: 1024
max-wait: -1ms
max-idle: 200
database: 2

第三步:代码实现

这里主要说一下与消息队列主要的代码,数据库常用的crud不做展示,具体可看项目demo

  1. rabbitmq的配置,有关rabbitmq的参数可以自行百度,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    package com.sonake.seconds.kill.demo.config;

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**
    * @author :xzyuan
    * @date :Created in 2019/12/26 14:19
    * @description
    * @version:
    */
    @Configuration
    public class RabbitMQConfig {
    //库存交换机
    public static final String STORY_EXCHANGE = "STORY_EXCHANGE";

    //订单交换机
    public static final String ORDER_EXCHANGE = "ORDER_EXCHANGE";

    //库存队列
    public static final String STORY_QUEUE = "STORY_QUEUE";

    //订单队列
    public static final String ORDER_QUEUE = "ORDER_QUEUE";

    //库存路由键
    public static final String STORY_ROUTING_KEY = "STORY_ROUTING_KEY";

    //订单路由键
    public static final String ORDER_ROUTING_KEY = "ORDER_ROUTING_KEY";

    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }


    //创建库存交换机
    @Bean
    public Exchange getStoryExchange() {
    return ExchangeBuilder.directExchange(STORY_EXCHANGE).durable(true).build();
    }

    //创建库存队列
    @Bean
    public Queue getStoryQueue() {
    return new Queue(STORY_QUEUE);
    }

    //库存交换机和库存队列绑定
    @Bean
    public Binding bindStory() {
    return BindingBuilder.bind(getStoryQueue()).to(getStoryExchange()).with(STORY_ROUTING_KEY).noargs();
    }

    //创建订单队列
    @Bean
    public Queue getOrderQueue() {
    return new Queue(ORDER_QUEUE);
    }

    //创建订单交换机
    @Bean
    public Exchange getOrderExchange() {
    return ExchangeBuilder.directExchange(ORDER_EXCHANGE).durable(true).build();
    }

    //订单队列与订单交换机进行绑定
    @Bean
    public Binding bindOrder() {
    return BindingBuilder.bind(getOrderQueue()).to(getOrderExchange()).with(ORDER_ROUTING_KEY).noargs();
    }

    }

库存初始化至redis

在开发中可能会有这样的情景。需要在容器启动的时候执行一些内容。比如读取配置文件,数据库连接之类的。
SpringBoot给我们提供了两个接口来帮助我们实现这种需求。这两个接口分别为CommandLineRunner和ApplicationRunner。
他们的执行时机为容器启动完成的时候。这里选用ApplicationRunner。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.sonake.seconds.kill.demo.runner;

import com.sonake.seconds.kill.demo.domain.Goods;
import com.sonake.seconds.kill.demo.service.GoodsService;
import com.sonake.seconds.kill.demo.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.List;

/**
* @author :xzyuan
* @date :Created in 2019/12/26 11:39
* @description:项目启动数据初始化
* @version: 1.0
*/
@Component
@Slf4j
public class ApplicationStartupRunner implements ApplicationRunner {
@Autowired
private GoodsService goodsService;
@Autowired
private RedisService redisService;
@Override
public void run(ApplicationArguments args) throws Exception {
List<Goods> list = goodsService.list();
list.forEach(goods -> {
redisService.put(goods.getGoodsName(),goods.getStore(),20);
});
log.info("商品库存初始化至redis完毕");
}
}

效果如图:

redis持久化.png
redis持久化.png
  1. 秒杀请求发起

在这里主要写了两个请求,主要作用见注释.
在使用redis+rebbitmq实现秒杀这个接口里,并没有针对数据库的操作,是针对redis做了数据增减,然后将下单结果告知rabbitmq并实现了暂存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.sonake.seconds.kill.demo.controller;

import com.sonake.seconds.kill.demo.config.RabbitMQConfig;
import com.sonake.seconds.kill.demo.domain.Orders;
import com.sonake.seconds.kill.demo.service.GoodsService;
import com.sonake.seconds.kill.demo.service.OrderService;
import com.sonake.seconds.kill.demo.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

/**
* @author :xzyuan
* @date :Created in 2019/12/26 14:22
* @description
* @version:
*/
@Controller
@Slf4j
public class SecController {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private RedisService redisService;

@Autowired
private OrderService orderService;

@Autowired
private GoodsService goodsService;

/**
* 使用redis+消息队列进行秒杀实现
*
* @param username
* @param goodsName
* @return
*/
@RequestMapping("/sec")
@ResponseBody
public String sec(@RequestParam(value = "username") String username, @RequestParam(value = "goodsName") String goodsName) {
log.info("参加秒杀的用户是:{},秒杀的商品是:{}", username, goodsName);
String message = null;
//调用redis给相应商品库存量减一
Long decrByResult = redisService.decrBy(goodsName);
if (decrByResult >= 0) {
/**
* 说明该商品的库存量有剩余,可以进行下订单操作
*/
log.info("用户:{}秒杀该商品:剩余{}库存,可以进行下订单操作", username, decrByResult, goodsName);
//发消息给库存消息队列,将库存数据减一
rabbitTemplate.convertAndSend(RabbitMQConfig.STORY_EXCHANGE, RabbitMQConfig.STORY_ROUTING_KEY, goodsName);

//发消息给订单消息队列,创建订单
Orders orders = new Orders();
orders.setOrderName(goodsName);
orders.setOrderUser(username);
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, RabbitMQConfig.ORDER_ROUTING_KEY, orders);
message = "用户" + username + "秒杀" + goodsName + "成功";
} else {
/**
* 说明该商品的库存量没有剩余,直接返回秒杀失败的消息给用户
*/
log.info("用户:{}秒杀时商品的库存量没有剩余,秒杀结束", username);
message = username + "商品的库存量没有剩余,秒杀结束";
}
return message;
}

/**
* 实现纯数据库操作实现秒杀操作
*
* @param username
* @param goodsName
* @return
*/
@RequestMapping("/secDataBase")
@ResponseBody
public String secDataBase(@RequestParam(value = "username") String username, @RequestParam(value = "goodsName") String goodsName) {
log.info("参加秒杀的用户是:{},秒杀的商品是:{}", username, goodsName);
return goodsService.secData(username,goodsName);
}
}
  1. 消息队列的监听并消费
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.sonake.seconds.kill.demo.service;

    import com.sonake.seconds.kill.demo.config.RabbitMQConfig;
    import com.sonake.seconds.kill.demo.domain.Orders;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    /**
    * @author :xzyuan
    * @date :Created in 2019/12/26 14:49
    * @description:订单监听
    * @version:
    */
    @Service
    @Slf4j
    public class MQOrderService {
    @Autowired
    private OrderService orderService;

    /**
    * 监听订单消息队列,并消费
    *
    * @param orders
    */
    @RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)
    public void createOrder(Orders orders) {
    log.info("收到订单消息,订单用户为:{},商品名称为:{}", orders.getOrderUser(), orders.getOrderName());
    /**
    * 调用数据库orderService创建订单信息
    */
    //int s =Integer.valueOf("ssss");
    orderService.save(orders);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.sonake.seconds.kill.demo.service;

    import com.sonake.seconds.kill.demo.config.RabbitMQConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    /**
    * @author :xzyuan
    * @date :Created in 2019/12/26 14:47
    * @description
    * @version:
    */
    @Slf4j
    @Service
    public class MQStoreService {

    @Autowired
    private GoodsService goodsService;

    /**
    * 监听库存消息队列,并消费
    *
    * @param goodsName
    */
    @RabbitListener(queues = RabbitMQConfig.STORY_QUEUE)
    public void decrByStock(String goodsName) {
    log.info("库存消息队列收到的消息商品信息是:{}", goodsName);
    /**
    * 调用数据库service给数据库对应商品库存减一
    */
    //int s =Integer.valueOf("ssss");
    goodsService.decrByStore(goodsName);
    }
    }

测试

利用压测工具jmeter模拟秒杀.

点我下载,注意与jdk的版本对应关系

模拟300个用户秒杀

jmeter截图1.png
jmeter截图1.png
jmeter截图2.png
jmeter截图2.png

点击请求,发起秒杀
用户收到的秒杀结果

jmeter3.png
jmeter3.png
jmeter4.png
jmeter4.png
jmeter5.png
jmeter5.png

可以看到:有的用户秒杀成功,有的失败,因为初始化的商品库存共计200件,这里模拟300用户下单,故有100用户秒杀失败
消息队列接收到的数据

rabbitmq1.png
rabbitmq1.png

客户端监听消息队列并进行消费

rabbitmq2.png
rabbitmq2.png

纯数据库秒杀

数据库秒杀1.png
数据库秒杀1.png

可以发现,由于并发的操作,导致多个请求进来的时候读取的商品库存量都是一样的

数据库结果

数据库秒杀2.png
数据库秒杀2.png

我设置了20用户秒杀,下单都成功了,但是数据库库存只少了6个.
可以看出,由于并发的情况,导致订单超卖,所有用户都已经下单成功了,并且数据库中的库存也并未归零(随着并发量的增多,数据库中的库存会为0 ,但是会有更多的用户下单成功,也会有用户下单失败,秒杀结束

至此,这里使用了redis+rabbitmq实现了高并发秒杀场景,并有效的防止了超卖现象,同时听过使用纯数据库操作产生了超卖的现象。

另外,由于考虑实际场景,rabbitmq万一挂掉我们的消息也就丢失了,为了防止此类情况,可设置消息队列的持久化或者集群,有效避免此类情况,这里没有做展示,感兴趣的可自行百度。
配置文件中rabbitmq有关重试的设置,在设置了重试次数以后,若还是没有尝试成功,消息就会被丢弃,客户端也无法获取消息,默认是一直尝试的,此处可设置手动ack的确认,处理异常,
并进行相关的的消息补偿等,详情百度。