高并发文章的阅读量PV业务场景分析
在一些需要统计PV(Page View), 即页面浏览量或点击量高并发系统中,如:知乎文章浏览量,淘宝商品页浏览量等,需要统计相应数据做分析。
假设知乎每天有统计10万篇文章,每篇文章的访问量10万,如果采用redis的incr命令来实现计数器的话,每天redis=100亿次的写操作,redis大约QPS=11.57万。
如此大的并发量,CPU肯定满负载运行,网络资源消耗也巨大,所有此种技术方案是行不通的。
二级缓存的高并发文章的阅读量PV技术方案
- 如此大的并发量,唯一办法就是减少redis的访问量
- 文章服务采用了集群部署,在线上可以部署多台
- 每个文章服务,增加了一级JVM缓存,即用Map存储在jvm,key为当前请求所属的时间块。
Map<Long,Map<Integer,Integer> > = Map<时间块,Map<文章id,访问量>>
- 什么是时间块?
- 就是把时间切割为一块块,例如:一般文章在1小时,30分钟、5分钟的时间内产生了多少阅读量。
- 那如何切割时间块呢?
时间戳是自 1970 年1月1日(00:00:00 GMT)至当前时间的总数,通过确定时间块大小,算出当前请求所属的时间戳从1970年算起位于第几个时间块,这个算出来的第几个时间块就是小时key即是map的key- 如:我们要计算“小时块”,先把当前的时间转换为为毫秒的
时间戳,然后除以一个小时,即当前时间T/1000*60*60=小时key,然后用这个小时序号作为key - 时间戳在线转换工具
- 2021-11-09 15:30:00 = 1636443000000毫秒
- 小时key=1636443000000/1000*60*60=454567.5=454567(取整数,即是距离1970年开始算的第454567个时间块)
Map<时间块,Map<文章id,访问量>> = Map<454567,Map<文章id,访问量>>- 以此类推,每一次PV操作时,先计算当前时间是那个时间块,然后存储Map中。
- 如:我们要计算“小时块”,先把当前的时间转换为为毫秒的
实战:SpringBoot+Redis
步骤1:PV请求处理逻辑
//保存时间块和pv数据的map
public static final Map<Long, Map<Integer,Integer>> PV_MAP=new ConcurrentHashMap();
/**
* pv请求调用:
* 即当前时间T/1000*60*60=小时key,然后用这个小时序号作为key。
* 例如:
* 2021-11-09 15:30:00 = 1636443000000毫秒
* 小时key=1636443000000/1000\*60\*60=454567.5=454567
*
* 每一次PV操作时,先计算当前时间是那个时间块,然后存储Map中。
* @param id 文章id
*/
public void addPV(Integer id) {
//生成环境:时间块为5分钟
//为了方便测试 改为1分钟 时间块
int timer=1;
long m1=System.currentTimeMillis()/(1000*60*timer);
//拿出这个时间块的所有文章数据
Map<Integer,Integer> mMap=Constants.PV_MAP.get(m1);
if (CollectionUtils.isEmpty(mMap)){
mMap=new ConcurrentHashMap();
mMap.put(id,new Integer(1));
//<1分钟的时间块,Map<文章Id,访问量>>
Constants.PV_MAP.put(m1, mMap);
}else {
//通过文章id 取出浏览量
Integer value=mMap.get(id);
if (value==null){
mMap.put(id,new Integer(1));
}else{
mMap.put(id,value+1);
}
}
}
复制代码
步骤2:一级缓存定时器消费
定时(5分钟)从jvm的map把时间块的阅读pv取出来,然后push到reids的list数据结构中,list的存储的数据为Map<文章id,访问量PV>即每个时间块的pv数据
/**
* 一级缓存定时器消费调用方法:
* 定时器,定时(5分钟)从jvm的map把时间块的阅读pv取出来,
* 然后push到reids的list数据结构中,list的存储的书为Map<文章id,访问量PV>即每个时间块的pv数据
*/
public void consumePV(){
//为了方便测试 改为1分钟 时间块
long m1=System.currentTimeMillis()/(1000*60*1);
Iterator<Long> iterator= Constants.PV_MAP.keySet().iterator();
while (iterator.hasNext()){
//取出map的时间块
Long key=iterator.next();
//小于当前的分钟时间块key ,就消费
if (key<m1){
//先push
Map<Integer,Integer> map=Constants.PV_MAP.get(key);
//push到reids的list数据结构中,list的存储的书为Map<文章id,访问量PV>即每个时间块的pv数据
this.redisTemplate.opsForList().leftPush(Constants.CACHE_PV_LIST,map);
//后remove
Constants.PV_MAP.remove(key);
log.info("push进{}",map);
}
}
}
复制代码
步骤3:二级缓存定时器消费
定时(5分钟),从redis的list数据结构pop弹出Map<文章id,访问量PV>,弹出来做了2件事:
- 先把Map<文章id,访问量PV>,保存到数据库
- 再把Map<文章id,访问量PV>,同步到redis缓存的计数器incr
/**
* 二级缓存定时器消费
* 定时器,定时(5分钟),从redis的list数据结构pop弹出Map<文章id,访问量PV>,弹出来做了2件事:
* 第一件事:先把Map<文章id,访问量PV>,保存到数据库
* 第二件事:再把Map<文章id,访问量PV>,同步到redis缓存的计数器incr。
*/
public boolean pop(){
//从redis的list数据结构pop弹出Map<文章id,访问量PV>
ListOperations<String, Map<Integer,Integer>> operations= this.redisTemplate.opsForList();
Map<Integer,Integer> map= operations.rightPop(Constants.CACHE_PV_LIST);
log.info("弹出pop={}",map);
if (CollectionUtils.isEmpty(map)){
return false;
}
// 第一步:先存入数据库
// TODO: 插入数据库
//第二步:同步redis缓存
for (Map.Entry<Integer,Integer> entry:map.entrySet()){
// log.info("key={},value={}",entry.getKey(),entry.getValue());
String key=Constants.CACHE_ARTICLE+entry.getKey();
//调用redis的increment命令
long n=this.redisTemplate.opsForValue().increment(key,entry.getValue());
// log.info("key={},pv={}",key, n);
}
return true;
}
}
复制代码
步骤4:查看浏览量
用了一级缓存,所有的高并发流量都收集到了本地JVM,然后5分钟同步给二级缓存,从而给redis降压。
@GetMapping(value = "/view")
public String view(Integer id) {
//文章pv的key
String key= Constants.CACHE_ARTICLE+id;
//调用redis的get命令
String n=this.stringRedisTemplate.opsForValue().get(key);
log.info("key={},阅读量为{}",key, n);
return n;
}
复制代码
redis分布式缓存系列文章
上一章节: redis分布式缓存(二十九)一一 微信抢红包解决方案
- 👍🏻:有收获的,点赞鼓励!
- ❤️:收藏文章,方便回看!
- 💬:评论交流,互相进步!




近期评论