高并发教程八:秒杀系统设计秒杀场景理解秒杀系统架构原则

秒杀场景

最典型的就是淘宝京东等电商双十一秒杀了,短时间上亿的用户涌入,瞬间流量巨大(高并发)。例如,200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量是有限的100件,这样真实能购买到该件商品的用户也只有100人及以下,不能卖超

但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品,但是,在抢购时间达到后,用户开始真正下单时,秒杀的服务器后端却不希望同时有几百万人同时发起抢购请求

我们都知道服务器的处理资源是有限的,所以出现峰值的时候,很容易导致服务器宕机,用户无法访问的情况出现,这就好比出行的时候存在早高峰和晚高峰的问题,为了解决这个问题,出行就有了错峰限行的解决方案

同理,在线上的秒杀等业务场景,也需要类似的解决方案,需要平安度过同时抢购带来的流量峰值的问题,这就是流量削峰的由来

理解秒杀系统

那么,如何才能更好地理解秒杀系统呢?我觉得作为一个程序员,你首先需要从高维度出发,从整体上思考问题。在我看来,秒杀其实主要解决两个问题,一个是并发读,一个是并发写。并发读的核心优化理念是尽量减少用户到服务端来“读”数据,或者让他们读更少的数据;并发写的处理原则也一样,它要求我们在数据库层面独立出来一个库,做特殊的处理。另外,我们还要针对秒杀系统做一些保护,针对意料之外的情况设计兜底方案,以防止最坏的情况发生。

而从一个架构师的角度来看,要想打造并维护一个超大流量并发读写、高性能、高可用的系统,在整个用户请求路径上从浏览器到服务端我们要遵循几个原则,就是要保证用户请求的数据尽量少、请求数尽量少、路径尽量短、依赖尽量少,并且不要有单点。这些关键点我会在后面的文章里重点讲解。

其实,秒杀的整体架构可以概括为“稳、准、快”几个关键字。

所谓“稳”,就是整个系统架构要满足高可用,流量符合预期时肯定要稳定,就是超出预期时也同样不能掉链子,你要保证秒杀活动顺利完成,即秒杀商品顺利地卖出去,这个是最基本的前提。

然后就是“准”,就是秒杀10台iPhone,那就只能成交10台,多一台少一台都不行。一旦库存不对,那平台就要承担损失,所以“准”就是要求保证数据的一致性。

最后再看“快”,“快”其实很好理解,它就是说系统的性能要足够高,否则你怎么支撑这么大的流量呢?不光是服务端要做极致的性能优化,而且在整个请求链路上都要做协同的优化,每个地方快一点,整个系统就完美了。

所以从技术角度上看“稳、准、快”,就对应了我们架构上的高可用、一致性和高性能的要求,我们的专栏也将主要围绕这几个方面来展开,具体如下:

  • 高性能。  秒杀涉及大量的并发读和并发写,因此支持高并发访问这点非常关键。将从设计数据的动静分离方案、热点的发现与隔离、请求的削峰与分层过滤、服务端的极致优化这4个方面重点介绍。
  • 一致性。  秒杀中商品减库存的实现方式同样关键。可想而知,有限数量的商品在同一时刻被很多倍的请求同时来减库存,减库存又分为“拍下减库存”“付款减库存”以及预扣等几种,在大并发更新的过程中都要保证数据的准确性,其难度可想而知。因此,我将用一篇文章来专门讲解如何设计秒杀减库存方案。
  • 高可用。  虽然我介绍了很多极致的优化思路,但现实中总难免出现一些我们考虑不到的情况,所以要保证系统的高可用和正确性,我们还要设计一个PlanB来兜底,以便在最坏情况发生时仍然能够从容应对。专栏的最后,我将带你思考可以从哪些环节来设计兜底方案。

架构原则“4要1不要”

数据要尽量少

  • 所谓“数据要尽量少”,首先是指用户请求的数据能少就少。请求的数据包括上传给系统的数据和系统返回给用户的数据(通常就是网页)。
  • 为啥“数据要尽量少”呢?因为首先这些数据在网络上传输需要时间,其次不管是请求数据还是返回数据都需要服务器做处理,而服务器在写网络时通常都要做压缩和字符编码,这些都非常消耗CPU,所以减少传输的数据量可以显著减少CPU的使用。例如,我们可以简化秒杀页面的大小,去掉不必要的页面装修效果,等等。
  • 其次,“数据要尽量少”还要求系统依赖的数据能少就少,包括系统完成某些业务逻辑需要读取和保存的数据,这些数据一般是和后台服务以及数据库打交道的。调用其他服务会涉及数据的序列化和反序列化,而这也是CPU的一大杀手,同样也会增加延时。而且,数据库本身也容易成为一个瓶颈,所以和数据库打交道越少越好,数据越简单、越小则越好

请求数要尽量少

  • 用户请求的页面返回后,浏览器渲染这个页面还要包含其他的额外请求,比如说,这个页面依赖的CSS/JavaScript、图片,以及Ajax请求等等都定义为“额外请求”,这些额外请求应该尽量少。因为浏览器每发出一个请求都多少会有一些消耗,例如建立连接要做三次握手,有的时候有页面依赖或者连接数限制,一些请求(例如JavaScript)还需要串行加载等。另外,如果不同请求的域名不一样的话,还涉及这些域名的DNS解析,可能会耗时更久。所以你要记住的是,减少请求数可以显著减少以上这些因素导致的资源消耗。
  • 例如,减少请求数最常用的一个实践就是合并CSS和JavaScript文件,把多个JavaScript文件合并成一个文件,在URL中用逗号隔开(g.xxx.com/tm/xx-b/4.0…??module-preview/index.xtpl.js,module-jhs/index.xtpl.js,module-focus/index.xtpl.js)。这种方式在服务端仍然是单个文件各自存放,只是服务端会有一个组件解析这个URL,然后动态把这些文件合并起来一起返回

路径要尽量短

  • 所谓“路径”,就是用户发出请求到返回数据这个过程中,需求经过的中间的节点数。
  • 通常,这些节点可以表示为一个系统或者一个新的Socket连接(比如代理服务器只是创建一个新的Socket连接来转发请求)。每经过一个节点,一般都会产生一个新的Socket连接。
  • 然而,每增加一个连接都会增加新的不确定性。从概率统计上来说,假如一次请求经过5个节点,每个节点的可用性是99.9%的话,那么整个请求的可用性是:99.9%的5次方,约等于99.5%。
  • 所以缩短请求路径不仅可以增加可用性,同样可以有效提升性能(减少中间节点可以减少数据的序列化与反序列化),并减少延时(可以减少网络传输耗时)。
  • 要缩短访问路径有一种办法,就是多个相互强依赖的应用合并部署在一起,把远程过程调用(RPC)变成JVM内部之间的方法调用。在《大型网站技术架构演进与性能优化》一书中,我也有一章介绍了这种技术的详细实现。

依赖要尽量少

  • 所谓依赖,指的是要完成一次用户请求必须依赖的系统或者服务,这里的依赖指的是强依赖。
  • 举个例子,比如说你要展示秒杀页面,而这个页面必须强依赖商品信息、用户信息,还有其他如优惠券、成交列表等这些对秒杀不是非要不可的信息(弱依赖),这些弱依赖在紧急情况下就可以去掉。
  • 要减少依赖,我们可以给系统进行分级,比如0级系统、1级系统、2级系统、3级系统,0级系统如果是最重要的系统,那么0级系统强依赖的系统也同样是最重要的系统,以此类推。
  • 注意,0级系统要尽量减少对1级系统的强依赖,防止重要的系统被不重要的系统拖垮。例如支付系统是0级系统,而优惠券是1级系统的话,在极端情况下可以把优惠券给降级,防止支付系统被优惠券这个1级系统给拖垮。

不要有单点

  • 系统中的单点可以说是系统架构上的一个大忌,因为单点意味着没有备份,风险不可控,我们设计分布式系统最重要的原则就是“消除单点”

不同场景下的不同架构案例

随着请求量的加大(比如从1w/s到了10w/s的量级),这个简单的架构很快就遇到了瓶颈,因此需要做架构改造来提升系统性能。这些架构改造包括:

  • 把秒杀系统独立出来单独打造一个系统,这样可以有针对性地做优化,例如这个独立出来的系统就减少了店铺装修的功能,减少了页面的复杂度;
  • 在系统部署上也独立做一个机器集群,这样秒杀的大流量就不会影响到正常的商品购买集群的机器负载;
  • 将热点数据(如库存数据)单独放到一个缓存系统中,以提高“读性能”;
  • 增加秒杀答题,防止有秒杀器抢单

image.png

然而这个架构仍然支持不了超过100w/s的请求量,所以为了进一步提升秒杀系统的性能,我们又对架构做进一步升级,比如:

  • 对页面进行彻底的动静分离,使得用户秒杀时不需要刷新整个页面,而只需要点击抢宝按钮,借此把页面刷新的数据降到最少;
  • 在服务端对秒杀商品进行本地缓存,不需要再调用依赖系统的后台服务获取数据,甚至不需要去公共的缓存集群中查询数据,这样不仅可以减少系统调用,而且能够避免压垮公共缓存集群。
  • 增加系统限流保护,防止最坏情况发生

image.png

如何做好动静分离

何为动静数据

那到底什么才是动静分离呢?所谓“动静分离”,其实就是把用户请求的数据(如HTML页面)划分为“动态数据”和“静态数据”。

简单来说, “动态数据”和“静态数据”的主要区别就是看页面中输出的数据是否和URL、浏览者、时间、地域相关,以及是否含有Cookie等私密数据。比如说:

  1. 很多媒体类的网站,某一篇文章的内容不管是你访问还是我访问,它都是一样的。所以它就是一个典型的静态数据,但是它是个动态页面。
  2. 我们如果现在访问淘宝的首页,每个人看到的页面可能都是不一样的,淘宝首页中包含了很多根据访问者特征推荐的信息,而这些个性化的数据就可以理解为动态数据了。

理解了静态数据和动态数据,我估计你很容易就能想明白“动静分离”这个方案的来龙去脉了。分离了动静数据,我们就可以对分离出来的静态数据做缓存,有了缓存之后,静态数据的“访问效率”自然就提高了。

那么,怎样对静态数据做缓存呢?我在这里总结了几个重点:

  • 第一,你应该把静态数据缓存到离用户最近的地方。静态数据就是那些相对不会变化的数据,因此我们可以把它们缓存起来。缓存到哪里呢?常见的就三种,用户浏览器里、CDN上或者在服务端的Cache中。你应该根据情况,把它们尽量缓存到离用户最近的地方。

  • 第二,静态化改造就是要直接缓存HTTP连接。相较于普通的数据缓存而言,你肯定还听过系统的静态化改造。静态化改造是直接缓存HTTP连接而不是仅仅缓存数据,如下图所示,Web代理服务器根据请求URL,直接取出对应的HTTP响应头和响应体然后直接返回,这个响应过程简单得连HTTP协议都不用重新组装,甚至连HTTP请求头也不需要解析。

  • 第三,让谁来缓存静态数据也很重要。不同语言写的Cache软件处理缓存数据的效率也各不相同。以Java为例,因为Java系统本身也有其弱点(比如不擅长处理大量连接请求,每个连接消耗的内存较多,Servlet容器解析HTTP协议较慢),所以你可以不在Java层做缓存,而是直接在Web服务器层上做,这样你就可以屏蔽Java语言层面的一些弱点;而相比起来,Web服务器(如Nginx、Apache、Varnish)也更擅长处理大并发的静态文件请求

image.png

动静分离架构方案

在将整个系统做动静分离后,我们自然会想到更进一步的方案,就是将Cache进一步前移到CDN上,因为CDN离用户最近,效果会更好。

但是要想这么做,有以下几个问题需要解决。

  • 失效问题。前面我们也有提到过缓存时效的问题,不知道你有没有理解,我再来解释一下。谈到静态数据时,我说过一个关键词叫“相对不变”,它的言外之意是“可能会变化”。比如一篇文章,现在不变,但如果你发现个错别字,是不是就会变化了?如果你的缓存时效很长,那用户端在很长一段时间内看到的都是错的。所以,这个方案中也是,我们需要保证CDN可以在秒级时间内,让分布在全国各地的Cache同时失效,这对CDN的失效系统要求很高。
  • 命中率问题。Cache最重要的一个衡量指标就是“高命中率”,不然Cache的存在就失去了意义。同样,如果将数据全部放到全国的CDN上,必然导致Cache分散,而Cache分散又会导致访问请求命中同一个Cache的可能性降低,那么命中率就成为一个问题。
  • 发布更新问题。如果一个业务系统每周都有日常业务需要发布,那么发布系统必须足够简洁高效,而且你还要考虑有问题时快速回滚和排查问题的简便性

从前面的分析来看,将商品详情系统放到全国的所有CDN节点上是不太现实的,因为存在失效问题、命中率问题以及系统的发布更新问题。那么是否可以选择若干个节点来尝试实施呢?答案是“可以”,但是这样的节点需要满足几个条件:

  • 靠近访问量比较集中的地区;
  • 离主站相对较远;
  • 节点到主站间的网络比较好,而且稳定;
  • 节点容量比较大,不会占用其他CDN太多的资源
  • 节点不要太多

基于上面几个因素,选择CDN的二级Cache比较合适,因为二级Cache数量偏少,容量也更大,让用户的请求先回源的CDN的二级Cache中,如果没命中再回源站获取数据,部署方式如下图所示:

image.png

使用CDN的二级Cache作为缓存,可以达到和当前服务端静态化Cache类似的命中率,因为节点数不多,Cache不是很分散,访问量也比较集中,这样也就解决了命中率问题,同时能够给用户最好的访问体验,是当前比较理想的一种CDN化方案。除此之外,CDN化部署方案还有以下几个特点:

  • 把整个页面缓存在用户浏览器中;
  • 如果强制刷新整个页面,也会请求CDN;
  • 实际有效请求,只是用户对“刷新抢宝”按钮的点击。

这样就把90%的静态数据缓存在了用户端或者CDN上,当真正秒杀时,用户只需要点击特殊的“刷新抢宝”按钮,而不需要刷新整个页面。这样一来,系统只是向服务端请求很少的有效数据,而不需要重复请求大量的静态数据。

秒杀的动态数据和普通详情页面的动态数据相比更少,性能也提升了3倍以上。所以“抢宝”这种设计思路,让我们不用刷新页面就能够很好地请求到服务端最新的动态数据。

处理好系统的“热点数据”

假设你的系统中存储有几十亿上百亿的商品,而每天有千万级的商品被上亿的用户访问,那么肯定有一部分被大量用户访问的热卖商品,这就是我们常说的“热点商品”。

为什么要关注热点

首先,热点请求会大量占用服务器处理资源,虽然这个热点可能只占请求总量的亿分之一,然而却可能抢占90%的服务器资源,如果这个热点请求还是没有价值的无效请求,那么对系统资源来说完全是浪费

什么是“热点”

热点分为热点操作热点数据

所谓“热点操作”,例如大量的刷新页面、大量的添加购物车、双十一零点大量的下单等都属于此类操作。对系统来说,这些操作可以抽象为“读请求”和“写请求”,这两种热点请求的处理方式大相径庭,读请求的优化空间要大一些,而写请求的瓶颈一般都在存储层,优化的思路就是根据CAP理论做平衡

“热点数据”比较好理解,那就是用户的热点请求对应的数据。而热点数据又分为“静态热点数据”和“动态热点数据”

  • 所谓“静态热点数据”,就是能够提前预测的热点数据。例如,我们可以通过卖家报名的方式提前筛选出来,通过报名系统对这些热点商品进行打标。另外,我们还可以通过大数据分析来提前发现热点商品,比如我们分析历史成交记录、用户的购物车记录,来发现哪些商品可能更热门、更好卖,这些都是可以提前分析出来的热点
  • 所谓“动态热点数据”,就是不能被提前预测到的,系统在运行过程中临时产生的热点。例如,卖家在抖音上做了广告,然后商品一下就火了,导致它在短时间内被大量购买

发现热点数据

发现静态热点数据

如前面讲的,静态热点数据可以通过商业手段,例如强制让卖家通过报名参加的方式提前把热点商品筛选出来,实现方式是通过一个运营系统,把参加活动的商品数据进行打标,然后通过一个后台系统对这些热点商品进行预处理,如提前进行缓存。但是这种通过报名提前筛选的方式也会带来新的问题,即增加卖家的使用成本,而且实时性较差,也不太灵活。

  • 不过,除了提前报名筛选这种方式,你还可以通过技术手段提前预测,例如对买家每天访问的商品进行大数据计算,然后统计出TOP N的商品,我们可以认为这些TOP N的商品就是热点商品。

发现动态热点数据

动态热点发现系统的具体实现

  1. 构建一个异步的系统,它可以收集交易链路上各个环节中的中间件产品的热点Key,如Nginx、缓存、RPC服务框架等这些中间件(一些中间件产品本身已经有热点统计模块)。
  2. 建立一个热点上报和可以按照需求订阅的热点服务的下发规范,主要目的是通过交易链路上各个系统(包括详情、购物车、交易、优惠、库存、物流等)访问的时间差,把上游已经发现的热点透传给下游系统,提前做好保护。比如,对于大促高峰期,详情系统是最早知道的,在统一接入层上Nginx模块统计的热点URL。
  3. 将上游系统收集的热点数据发送到热点服务台,然后下游系统(如交易系统)就会知道哪些商品会被频繁调用,然后做热点保护。

这里我给出了一个图,其中用户访问商品时经过的路径有很多,我们主要是依赖前面的导购页面(包括首页、搜索页面、商品详情、购物车等)提前识别哪些商品的访问量高,通过这些系统中的中间件来收集热点数据,并记录到日志中。

image.png

我们通过部署在每台机器上的Agent把日志汇总到聚合和分析集群中,然后把符合一定规则的热点数据,通过订阅分发系统再推送到相应的系统中。你可以是把热点数据填充到Cache中,或者直接推送到应用服务器的内存中,还可以对这些数据进行拦截,总之下游系统可以订阅这些数据,然后根据自己的需求决定如何处理这些数据

流量削峰

削峰从本质上来说就是更多地延缓用户请求,以及层层过滤用户的访问需求,遵从最后落地到数据库的请求数要尽量少的原则
流量削峰主要有三种操作思路(排队,答题,过滤),简单说下

  1. 排队最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去,在这里,消息队列就像水库一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的
  2. 答题目的其实就是延缓请求,起到对请求流量进行削峰的作用,从而让系统能够更好地支持瞬时的流量高峰
  3. 前面介绍的排队和答题,要么是在接收请求时做缓冲,要么是减少请求的同时发送,而针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求,从Web层接到请求,到缓存,消息队列,最终到数据库这样就像漏斗一样,尽量把数据量和请求量一层一层地过滤和减少了,最终,到漏斗最末端(数据库)的才是有效请求

几种流量消峰方案

排队

要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。在这里,消息队列就像“水库”一样, 拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的

image.png
但是,如果流量峰值持续一段时间达到了消息队列的处理上限,例如本机的消息积压达到了存储空间的上限,消息队列同样也会被压垮,这样虽然保护了下游的系统,但是和直接把请求丢弃也没多大的区别。就像遇到洪水爆发时,即使是有水库恐怕也无济于事。

答题

你是否还记得,最早期的秒杀只是纯粹地刷新页面和点击购买按钮,它是后来才增加了答题功能的。那么,为什么要增加答题功能呢?

这主要是为了增加购买的复杂度,从而达到两个目的。

第一个目的是防止部分买家使用秒杀器在参加秒杀时作弊。2011年秒杀非常火的时候,秒杀器也比较猖獗,因而没有达到全民参与和营销的目的,所以系统增加了答题来限制秒杀器。增加答题后,下单的时间基本控制在2s后,秒杀器的下单比例也大大下降。答题页面如下图所示。

第二个目的其实就是延缓请求,起到对请求流量进行削峰的作用,从而让系统能够更好地支持瞬时的流量高峰。这个重要的功能就是把峰值的下单请求拉长,从以前的1s之内延长到2s~10s。这样一来,请求峰值基于时间分片了。这个时间的分片对服务端处理并发非常重要,会大大减轻压力。而且,由于请求具有先后顺序,靠后的请求到来时自然也就没有库存了,因此根本到不了最后的下单步骤,所以真正的并发写就非常有限了。这种设计思路目前用得非常普遍,如当年支付宝的“咻一咻”、微信的“摇一摇”都是类似的方式。

这里,我重点说一下秒杀答题的设计思路。

分层过滤

前面介绍的排队和答题要么是少发请求,要么对发出来的请求进行缓冲,而针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求。分层过滤其实就是采用“漏斗”式设计来处理请求的,如下图所示

image.png

分层过滤的核心思想是:在不同的层次尽可能地过滤掉无效请求,让“漏斗”最末端的才是有效请求。而要达到这种效果,我们就必须对数据做分层的校验

分层校验的基本原则是:

  • 将动态请求的读数据缓存(Cache)在Web端,过滤掉无效的数据读;
  • 对读数据不做强一致性校验,减少因为一致性校验产生瓶颈的问题;
  • 对写数据进行基于时间的合理分片,过滤掉过期的失效请求;
  • 对写请求做限流保护,将超出系统承载能力的请求过滤掉;
  • 对写数据进行强一致性校验,只保留最后有效的数据

分层校验的目的是:在读系统中,尽量减少由于一致性校验带来的系统瓶颈,但是尽量将不影响性能的检查条件提前,如用户是否具有秒杀资格、商品状态是否正常、用户答题是否正确、秒杀是否已经结束、是否非法请求、营销等价物是否充足等;在写数据系统中,主要对写的数据(如“库存”)做一致性检查,最后在数据库层保证数据的最终准确性(如“库存”不能减为负数)

秒杀实践(纯后端设计部分)

  1. 用户通过前端校验最终发起请求到后端
  2. 然后校验库存,扣库存,创建订单
  3. 最终数据落地,持久化保存

准备工作

数据库

--- 删除数据库
drop database seckill;
--- 创建数据库
create database seckill;
--- 使用数据库
use seckill;
--- 创建库存表
DROP TABLE IF EXISTS `t_seckill_stock`;
CREATE TABLE `t_seckill_stock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '库存ID',
  `name` varchar(50) NOT NULL DEFAULT 'OnePlus 7 Pro' COMMENT '名称',
  `count` int(11) NOT NULL COMMENT '库存',
  `sale` int(11) NOT NULL COMMENT '已售',
  `version` int(11) NOT NULL COMMENT '乐观锁,版本号',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='库存表';
--- 插入一条商品,初始化10个库存
INSERT INTO `t_seckill_stock` (`count`, `sale`, `version`) VALUES ('10', '0', '0');
--- 创建库存订单表
DROP TABLE IF EXISTS `t_seckill_stock_order`;
CREATE TABLE `t_seckill_stock_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `stock_id` int(11) NOT NULL COMMENT '库存ID',
  `name` varchar(30) NOT NULL DEFAULT 'OnePlus 7 Pro' COMMENT '商品名称',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='库存订单表';

复制代码

缓存

1、安装
brew install redis

2、查看安装及配置文件位置
-   Homebrew安装的软件会默认在`/usr/local/Cellar/`路径下
-   redis的配置文件`redis.conf`存放在`/usr/local/etc`路径下

3、启动redis服务
//方式一:使用brew帮助我们启动软件 
brew services start redis 
//方式二 
redis-server /usr/local/etc/redis.conf

//执行以下命令 
redis-server

4、查看redis服务进程
ps axu | grep redis

5、redis-cli连接redis服务
//redis默认端口号**6379**,默认**auth**为空,输入以下命令即可连接
redis-cli -h 127.0.0.1 -p 6379

6、启动 redis 客户端,打开终端并输入命令 **redis-cli**。该命令会连接本地的 redis 服务
$redis-cli 
redis 127.0.0.1:6379> 
redis 127.0.0.1:6379> PING 
PONG


7、关闭redis服务
//正确停止Redis的方式应该是向Redis发送SHUTDOWN命令
redis-cli shutdown

//强行终止redis
sudo pkill redis-server
复制代码

代码项目

github.com/lmandlyp163…

传统方式

我们首先搭建一个后台服务接口(实现校验库存,扣库存,创建订单),不做任何限制,使用JMeter,模拟500个并发线程测试购买10个库存的商品

思路介绍

不做任何控制,按照流程进行检查库存,扣库存,下订单,这种方式会存在并发问题

代码实现

接口入口

/**
 * 传统方式下订单
 */
@PostMapping("/createWrongOrder/{id}")
public ResponseBean createWrongOrder(@PathVariable("id") Integer id) throws Exception {
    Integer orderCount = seckillEvolutionService.createWrongOrder(id);
    return new ResponseBean(HttpStatus.OK.value(), "购买成功", orderCount);
}
复制代码

核心service 逻辑(纯DB操作):校验库存,扣库存,创建订单

@Override
@Transactional(rollbackFor = Exception.class)
public Integer createWrongOrder(Integer id) throws Exception {
    // 检查库存
    StockDto stockDto = stockDao.selectByPrimaryKey(id);
    if (stockDto.getCount() <= 0) {
        throw new CustomException("库存不足");
    }
    // 扣库存
    stockDto.setCount(stockDto.getCount() - 1);
    stockDto.setSale(stockDto.getSale() + 1);
    Integer saleCount = stockDao.updateByPrimaryKey(stockDto);
    if (saleCount <= 0) {
        throw new CustomException("扣库存失败");
    }
    // 下订单
    StockOrderDto stockOrderDto = new StockOrderDto();
    stockOrderDto.setStockId(stockDto.getId());
    Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
    if (saleCount <= 0) {
        throw new CustomException("下订单失败");
    }
    return orderCount;
}
复制代码

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

1、初始化数据库库存

image.png
2、配置JMeter

image.png

image.png

image.png
打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png
商品实际显示为卖出10,库存还有0,而订单表却有大于10条数据

Druid SQL分析

image.png

  • 发现SQL下单与更新库存74次、明显出现了库存超卖

可以发现并发事务下会出现错误,出现卖超问题,这是因为同一时间大量线程同时请求校验库存,扣库存,创建订单,这三个操作不在同一个原子,比如,很多线程同时读到库存为10,这样都穿过了校验库存的判断,所以出现卖超问题

使用乐观锁控制超卖

在传统方式下会出现超卖,所以在这种情况下就引入了的概念,锁区分为乐观锁和悲观锁,悲观锁都是牺牲性能保证数据,所以在这种高并发场景下,一般都是使用乐观锁解决

思路介绍

这次我们引入乐观锁,这里可以先查看一篇文章: MySQL那些锁(opens new window)

主要改造是扣库存,每个线程在检查库存的时候会拿到当前商品的乐观锁版本号,然后在扣库存时,如果版本号不对,就会扣减失败,抛出异常结束,这样每个版本号就只能有一个线程操作成功,其他相同版本号的线程秒杀失败,就不会存在卖超问题

代码实现

接口入口

/**
 * 使用乐观锁下订单
 */
@PostMapping("/createOptimisticLockOrder/{id}")
public ResponseBean createOptimisticLockOrder(@PathVariable("id") Integer id) throws Exception {
    Integer orderCount = seckillEvolutionService.createOptimisticLockOrder(id);
    return new ResponseBean(HttpStatus.OK.value(), "购买成功", orderCount);
}
复制代码

核心service 逻辑(纯DB操作):校验库存,扣库存,创建订单

@Override
@Transactional(rollbackFor = Exception.class)
public Integer createOptimisticLockOrder(Integer id) throws Exception {
    // 检查库存
    StockDto stockDto = stockDao.selectByPrimaryKey(id);
    if (stockDto.getCount() <= 0) {
        throw new CustomException("库存不足");
    }
    // 扣库存
    Integer saleCount = stockDao.updateByOptimisticLock(stockDto);
    if (saleCount <= 0) {
        throw new CustomException("扣库存失败");
    }
    // 下订单
    StockOrderDto stockOrderDto = new StockOrderDto();
    stockOrderDto.setStockId(stockDto.getId());
    Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
    if (saleCount <= 0) {
        throw new CustomException("下订单失败");
    }
    return orderCount;
}
复制代码
/**
 * 乐观锁更新扣减库存
 */
@Update("UPDATE t_seckill_stock SET count = count - 1, sale = sale + 1, version = version + 1 " +
        "WHERE id = #{id, jdbcType = INTEGER} AND version = #{version, jdbcType = INTEGER} " +
        "")
int updateByOptimisticLock(StockDto stockDto);
复制代码

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

1、初始化数据库库存

image.png

2、配置JMeter

image.png

image.png

打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png
商品实际显示为卖出10,库存还有0,而订单表也只有10条数据

Druid SQL分析

image.png
多个线程同时在检查库存的时候都会拿到当前商品的相同乐观锁版本号,然后在扣库存时,如果版本号不对,就会扣减失败,抛出异常结束,这样每个版本号就只能有第一个线程扣库存操作成功,其他相同版本号的线程秒杀失败,就不会存在卖超问题

使用缓存&乐观锁

思路介绍

1、一般秒杀都会提前预热缓存,我们添加一个缓存预热的方法,初始化库存后再缓存预热,这样不会出现Cache读取Miss的情况

2、这里我采用的是先更新数据库再更新缓存,因为这里缓存数据计算简单,只需要进行加减一即可,所以我们直接进行更新缓存

3、这次主要改造是检查库存和扣库存方法,检查库存直接去Redis获取,不再去查数据库,而在扣库存这里本身是使用的乐观锁操作,只有操作成功(扣库存成功)的才需要更新缓存数据

代码实现

缓存预热

/**
 * 缓存预热
 */
@PostMapping("/initCache/{id}")
public ResponseBean initCache(@PathVariable("id") Integer id) {
    StockDto stockDto = stockService.selectByPrimaryKey(id);
    // 商品缓存预热
    JedisUtil.set(Constant.PREFIX_COUNT + id.toString(), stockDto.getCount().toString());
    JedisUtil.set(Constant.PREFIX_SALE + id.toString(), stockDto.getSale().toString());
    JedisUtil.set(Constant.PREFIX_VERSION + id.toString(), stockDto.getVersion().toString());
    return new ResponseBean(HttpStatus.OK.value(), "缓存预热成功", null);
}
复制代码

接口入口

    /**
     * 使用乐观锁下订单,并且添加读缓存,性能提升
     */
    @PostMapping("/createOptimisticLockOrderWithRedis/{id}")
    public ResponseBean createOptimisticLockOrderWithRedis(@PathVariable("id") Integer id) throws Exception {
        // 错误的,线程不安全
//         Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisWrong(id);
        // 正确的,线程安全
        Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisSafe(id);
        return new ResponseBean(HttpStatus.OK.value(), "购买成功", null);
    }
复制代码

核心service 逻辑:校验库存缓存,扣库存,创建订单

@Override
@Transactional(rollbackFor = Exception.class)
public Integer createOptimisticLockOrderWithRedisSafe(Integer id) throws Exception {
    // 检查库存
    // 使用缓存读取库存,减轻DB压力,Redis批量操作(具有原子性)解决线程安全问题
    List<String> dataList = JedisUtil.mget(Constant.PREFIX_COUNT + id,
            Constant.PREFIX_SALE + id, Constant.PREFIX_VERSION + id);
    Integer count = Integer.parseInt(dataList.get(0));
    Integer sale = Integer.parseInt(dataList.get(1));
    Integer version = Integer.parseInt(dataList.get(2));
    if (count <= 0) {
        throw new CustomException("库存不足");
    }
    // 还有库存
    StockDto stockDto = new StockDto();
    stockDto.setId(id);
    stockDto.setCount(count);
    stockDto.setSale(sale);
    stockDto.setVersion(version);
    // 扣库存
    Integer saleCount = stockDao.updateByOptimisticLock(stockDto);
    // 操作数据大于0,说明扣库存成功
    if (saleCount > 0) {
        logger.info("版本号:{} {} {}", stockDto.getCount(), stockDto.getSale(), stockDto.getVersion());
        // 更新缓存,这里更新需要保证三个数据(库存,已售,乐观锁版本号)的一致性,使用mset原子操作
        updateCache(stockDto);
    }
    if (saleCount <= 0) {
        throw new CustomException("扣库存失败");
    }
    // 下订单
    StockOrderDto stockOrderDto = new StockOrderDto();
    stockOrderDto.setStockId(stockDto.getId());
    Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
    if (saleCount <= 0) {
        throw new CustomException("下订单失败");
    }
    Thread.sleep(10);
    return orderCount;
}
复制代码
/**
 * 这里遵循先更新数据库,再更新缓存,详细的数据库与缓存一致性解析可以查看
 * https://note.dolyw.com/cache/00-DataBaseConsistency.html
 */
public void updateCache(StockDto stockDto) {
    Integer count = stockDto.getCount() - 1;
    Integer sale = stockDto.getSale() + 1;
    Integer version = stockDto.getVersion() + 1;
    JedisUtil.mset(Constant.PREFIX_COUNT + stockDto.getId(), count.toString(),
            Constant.PREFIX_SALE + stockDto.getId(), sale.toString(),
            Constant.PREFIX_VERSION + stockDto.getId(), version.toString());
}
复制代码

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

0、初始化缓存库存

image.png

image.png

1、初始化数据库库存

image.png

2、配置JMeter

image.png

image.png

打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png

image.png

商品实际显示为卖出10,库存还有0,而订单表也只有10条数据

Druid SQL分析

image.png
使用了缓存,可以看到库存查询SQL只执行了一次,就是缓存预热那执行了一次,不像之前每次库存都去查数据库

分布式限流&缓存&乐观锁

思路介绍

之前说到乐观锁更新操作还是执行了近 100 次 SQL,其实这 100 次里就只有 10 次扣库存成功才是有效请求,其他的都是无效请求,为了遵从最后落地到数据库的请求数要尽量少的原则,这里我们使用限流,把大部分无效请求拦截,尽可能保证最终到达数据库的都是有效请求,限流算法参考限流算法

我们这里使用固定时间窗口最好,这里使用 Redis + Lua 的分布式限流方式

代码实现

缓存预热

/**
 * 缓存预热
 */
@PostMapping("/initCache/{id}")
public ResponseBean initCache(@PathVariable("id") Integer id) {
    StockDto stockDto = stockService.selectByPrimaryKey(id);
    // 商品缓存预热
    JedisUtil.set(Constant.PREFIX_COUNT + id.toString(), stockDto.getCount().toString());
    JedisUtil.set(Constant.PREFIX_SALE + id.toString(), stockDto.getSale().toString());
    JedisUtil.set(Constant.PREFIX_VERSION + id.toString(), stockDto.getVersion().toString());
    return new ResponseBean(HttpStatus.OK.value(), "缓存预热成功", null);
}
复制代码

Lua脚本

  • 秒级限流(每秒限制多少请求)
-- 实现原理
-- 每次请求都将当前时间,精确到秒作为 key 放入 Redis 中
-- 超时时间设置为 2s, Redis 将该 key 的值进行自增
-- 当达到阈值时返回错误,表示请求被限流
-- 写入 Redis 的操作用 Lua 脚本来完成
-- 利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

-- 资源唯一标志位
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 获取当前流量大小
local currentLimit = tonumber(redis.call('get', key) or "0")

if currentLimit + 1 > limit then
    -- 达到限流大小 返回
    return 0;
else
    -- 没有达到阈值 value + 1
    redis.call("INCRBY", key, 1)
    -- 设置过期时间
    redis.call("EXPIRE", key, 2)
    return currentLimit + 1
end
复制代码
  • 自定义参数限流(自定义多少时间限制多少请求)
-- 实现原理
-- 每次请求都去 Redis 取到当前限流开始时间和限流累计请求数
-- 判断限流开始时间加超时时间戳(限流时间)大于当前请求时间戳
-- 再判断当前时间窗口请求内是否超过限流最大请求数
-- 当达到阈值时返回错误,表示请求被限流,否则通过
-- 写入 Redis 的操作用 Lua 脚本来完成
-- 利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

-- 一个时间窗口开始时间(限流开始时间)key名称
local timeKey = KEYS[1]
-- 一个时间窗口内请求的数量累计(限流累计请求数)key名称
local requestKey = KEYS[2]
-- 限流大小,限流最大请求数
local maxRequest = tonumber(ARGV[1])
-- 当前请求时间戳
local nowTime = tonumber(ARGV[2])
-- 超时时间戳,一个时间窗口时间(毫秒)(限流时间)
local timeRequest = tonumber(ARGV[3])

-- 获取限流开始时间,不存在为0
local currentTime = tonumber(redis.call('get', timeKey) or "0")
-- 获取限流累计请求数,不存在为0
local currentRequest = tonumber(redis.call('get', requestKey) or "0")

-- 判断当前请求时间戳是不是在当前时间窗口中
-- 限流开始时间加超时时间戳(限流时间)大于当前请求时间戳
if currentTime + timeRequest > nowTime then
    -- 判断当前时间窗口请求内是否超过限流最大请求数
    if currentRequest + 1 > maxRequest then
        -- 在时间窗口内且超过限流最大请求数,返回
        return 0;
    else
        -- 在时间窗口内且请求数没超,请求数加一
        redis.call("INCRBY", requestKey, 1)
        return currentRequest + 1;
    end
else
    -- 超时后重置,开启一个新的时间窗口
    redis.call('set', timeKey, nowTime)
    redis.call('set', requestKey, '0')
    -- 设置过期时间
    redis.call("EXPIRE", timeKey, timeRequest / 1000)
    redis.call("EXPIRE", requestKey, timeRequest / 1000)
    -- 请求数加一
    redis.call("INCRBY", requestKey, 1)
    return 1;
end
复制代码

接口入口

/**
 * 使用乐观锁下订单,并且添加读缓存,再添加限流
 */
@Limit
@PostMapping("/createOptimisticLockOrderWithRedisLimit/{id}")
public ResponseBean createOptimisticLockOrderWithRedisLimit(@PathVariable("id") Integer id) throws Exception {
    // 正确的,线程安全
    Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisSafe(id);
    return new ResponseBean(HttpStatus.OK.value(), "购买成功", null);
}
复制代码

限流注解

/**
 * 限流注解
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Limit {

    /**
     * 限流最大请求数
     * @return
     */
    String maxRequest() default "10";

    /**
     * 一个时间窗口(毫秒)
     * @return
     */
    String timeRequest() default "1000";

}
复制代码

LimitAspect限流切面

/**
 * LimitAspect限流切面
 */
@Order(0)
@Aspect
@Component
public class LimitAspect {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(LimitAspect.class);

    /**
     * 一个时间窗口时间(毫秒)(限流时间)
     */
    private static final String TIME_REQUEST = "1000";

    /**
     * RedisLimitUtil
     */
    @Autowired
    private RedisLimitUtil redisLimitUtil;

    /**
     * 对应注解
     */
    @Pointcut("@annotation(com.example.limit.Limit)")
    public void aspect() {}

    /**
     * 切面
     */
    @Around("aspect() && @annotation(limit)")
    public Object Interceptor(ProceedingJoinPoint proceedingJoinPoint, Limit limit) {
        Object result = null;
        Long maxRequest = 0L;
        // 一个时间窗口(毫秒)为1000的话默认调用秒级限流判断(每秒限制多少请求)
        if (TIME_REQUEST.equals(limit.timeRequest())) {
            maxRequest = redisLimitUtil.limit(limit.maxRequest());
        } else {
            maxRequest = redisLimitUtil.limit(limit.maxRequest(), limit.timeRequest());
        }
        // 返回请求数量大于0说明不被限流
        if (maxRequest > 0) {
            // 放行,执行后续方法
            try {
                result = proceedingJoinPoint.proceed();
            } catch (Throwable throwable) {
                throw new CustomException(throwable.getMessage());
            }
        } else {
            // 直接返回响应结果
            throw new CustomException("请求拥挤,请稍候重试");
        }
        return result;
    }

    /**
     * 执行方法前再执行
     */
    @Before("aspect() && @annotation(limit)")
    public void before(Limit limit) {
        // logger.info("before");
    }

    /**
     * 执行方法后再执行
     */
    @After("aspect() && @annotation(limit)")
    public void after(Limit limit) {
        // logger.info("after");
    }

}
复制代码

RedisLimitUtil

/**
 * RedisLimitUtil
 */
@Component
public class RedisLimitUtil {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(RedisLimitUtil.class);

    /**
     * 秒级限流(每秒限制多少请求)字符串脚本
     */
    private static String LIMIT_SECKILL_SCRIPT = null;

    /**
     * 自定义参数限流(自定义多少时间限制多少请求)字符串脚本
     */
    private static String LIMIT_CUSTOM_SCRIPT = null;

    /**
     * redis-key-前缀-limit-限流
     */
    private static final String LIMIT = "limit:";

    /**
     * redis-key-名称-limit-一个时间窗口内请求的数量累计(限流累计请求数)
     */
    private static final String LIMIT_REQUEST = "limit:request";

    /**
     * redis-key-名称-limit-一个时间窗口开始时间(限流开始时间)
     */
    private static final String LIMIT_TIME = "limit:time";

    /**
     * 构造方法初始化加载Lua脚本
     */
    public RedisLimitUtil() {
        LIMIT_SECKILL_SCRIPT = getScript("redis/limit-seckill.lua");
        LIMIT_CUSTOM_SCRIPT = getScript("redis/limit-custom.lua");
    }

    /**
     * 秒级限流判断(每秒限制多少请求)
     */
    public Long limit(String maxRequest) {
        // 获取key名,当前时间戳
        String key = LIMIT + String.valueOf(System.currentTimeMillis() / 1000);
        // 传入参数,限流最大请求数
        List<String> args = new ArrayList<>();
        args.add(maxRequest);
        return eval(LIMIT_SECKILL_SCRIPT, Collections.singletonList(key), args);
    }

    /**
     * 自定义参数限流判断(自定义多少时间限制多少请求)
     */
    public Long limit(String maxRequest, String timeRequest) {
        // 获取key名,一个时间窗口开始时间(限流开始时间)和一个时间窗口内请求的数量累计(限流累计请求数)
        List<String> keys = new ArrayList<>();
        keys.add(LIMIT_TIME);
        keys.add(LIMIT_REQUEST);
        // 传入参数,限流最大请求数,当前时间戳,一个时间窗口时间(毫秒)(限流时间)
        List<String> args = new ArrayList<>();
        args.add(maxRequest);
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(timeRequest);
        return eval(LIMIT_CUSTOM_SCRIPT, keys, args);
    }

    /**
     * 执行Lua脚本方法
     */
    private Long eval(String script, List<String> keys, List<String> args) {
        // 执行脚本
        Object result = JedisUtil.eval(script, keys, args);
        // 结果请求数大于0说明不被限流
        return (Long) result;
    }

    /**
     * 获取Lua脚本
     */
    private static String getScript(String path) {
        StringBuilder stringBuilder = new StringBuilder();
        InputStream inputStream = RedisLimitUtil.class.getClassLoader().getResourceAsStream(path);
        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
            String str;
            while ((str = bufferedReader.readLine()) != null) {
                stringBuilder.append(str).append(System.lineSeparator());
            }
        } catch (IOException e) {
            logger.error(Arrays.toString(e.getStackTrace()));
            throw new CustomException("获取Lua限流脚本出现问题: " + Arrays.toString(e.getStackTrace()));
        }
        return stringBuilder.toString();
    }

}
复制代码

JedisUtil

    /**
     * 脚本执行
     */
    public static Object eval(String script, List<String> keys, List<String> args) {
        Object result = null;
        try (Jedis jedis = jedisPool.getResource()) {
            result = jedis.eval(script, keys, args);
            return result;
        } catch (Exception e) {
            throw new CustomException("Redis脚本执行eval方法异常:script=" + script + " keys=" +
                    keys.toString() + " args=" + args.toString() + " cause=" + e.getMessage());
        }
    }
复制代码

核心service 逻辑先限流,再校验库存缓存,扣库存,创建订单

@Override
@Transactional(rollbackFor = Exception.class)
public Integer createOptimisticLockOrderWithRedisSafe(Integer id) throws Exception {
    // 检查库存
    // 使用缓存读取库存,减轻DB压力,Redis批量操作(具有原子性)解决线程安全问题
    List<String> dataList = JedisUtil.mget(Constant.PREFIX_COUNT + id,
            Constant.PREFIX_SALE + id, Constant.PREFIX_VERSION + id);
    Integer count = Integer.parseInt(dataList.get(0));
    Integer sale = Integer.parseInt(dataList.get(1));
    Integer version = Integer.parseInt(dataList.get(2));
    if (count <= 0) {
        throw new CustomException("库存不足");
    }
    // 还有库存
    StockDto stockDto = new StockDto();
    stockDto.setId(id);
    stockDto.setCount(count);
    stockDto.setSale(sale);
    stockDto.setVersion(version);
    // 扣库存
    Integer saleCount = stockDao.updateByOptimisticLock(stockDto);
    // 操作数据大于0,说明扣库存成功
    if (saleCount > 0) {
        logger.info("版本号:{} {} {}", stockDto.getCount(), stockDto.getSale(), stockDto.getVersion());
        // 更新缓存,这里更新需要保证三个数据(库存,已售,乐观锁版本号)的一致性,使用mset原子操作
        updateCache(stockDto);
    }
    if (saleCount <= 0) {
        throw new CustomException("扣库存失败");
    }
    // 下订单
    StockOrderDto stockOrderDto = new StockOrderDto();
    stockOrderDto.setStockId(stockDto.getId());
    Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
    if (saleCount <= 0) {
        throw new CustomException("下订单失败");
    }
    Thread.sleep(10);
    return orderCount;
}
复制代码
/**
 * 这里遵循先更新数据库,再更新缓存,详细的数据库与缓存一致性解析可以查看
 * https://note.dolyw.com/cache/00-DataBaseConsistency.html
 */
public void updateCache(StockDto stockDto) {
    Integer count = stockDto.getCount() - 1;
    Integer sale = stockDto.getSale() + 1;
    Integer version = stockDto.getVersion() + 1;
    JedisUtil.mset(Constant.PREFIX_COUNT + stockDto.getId(), count.toString(),
            Constant.PREFIX_SALE + stockDto.getId(), sale.toString(),
            Constant.PREFIX_VERSION + stockDto.getId(), version.toString());
}
复制代码

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

0、初始化缓存库存

image.png

image.png

1、初始化数据库库存

image.png

2、配置JMeter

image.png
PS: 这次我们填写 Ramp-Up 时间为 5 秒,意思为执行 5 秒,每秒执行 100 个并发,因为如果都在 1S 内执行完,会被限流

image.png

打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png
我们看下后台日志,可以看到很多请求直接被限流限制了,这样就达到了我们的目的

image.png

image.png

商品实际显示为卖出10,库存还有0,而订单表也只有10条数据

Druid SQL分析

image.png
使用了限流,可以看到乐观锁更新不像之前那样执行 61 次了,只执行了 19 次,很多请求直接被限流了

异步下单

那我们还可以怎么优化提高吞吐量以及性能呢,我们上文所有例子其实都是同步请求,完全可以利用同步转异步来提高性能,这里我们将下订单的操作进行异步化,利用消息队列来进行解耦,这样可以然 DB 异步执行下单

每当一个请求通过了限流和库存校验之后就将订单信息发给消息队列,这样一个请求就可以直接返回了,消费程序做下订单的操作,对数据进行入库落地,因为异步了,所以最终需要采取回调或者是其他提醒的方式提醒用户购买完成

参考

  • 感谢网络大神们的笔记资料

www.mamicode.com/info-detail…
time.geekbang.org/column/arti…
note.dolyw.com/distributed…
note.dolyw.com/seckill-evo…
www.cnblogs.com/stulzq/p/89…