redis分布式缓存(三十)一一🚀二级缓存的高并发文章PV

高并发文章的阅读量PV业务场景分析

在一些需要统计PV(Page View), 即页面浏览量或点击量高并发系统中,如:知乎文章浏览量,淘宝商品页浏览量等,需要统计相应数据做分析。

假设知乎每天有统计10万篇文章,每篇文章的访问量10万,如果采用redis的incr命令来实现计数器的话,每天redis=100亿次的写操作,redis大约QPS=11.57万。

如此大的并发量,CPU肯定满负载运行,网络资源消耗也巨大,所有此种技术方案是行不通的。

100亿次写操作

客户端1

10万请求

客户端2

...

redis

二级缓存的高并发文章的阅读量PV技术方案

  1. 如此大的并发量,唯一办法就是减少redis的访问量
  • 文章服务采用了集群部署,在线上可以部署多台
  • 每个文章服务,增加了一级JVM缓存,即用Map存储在jvm,key为当前请求所属的时间块。
  • Map<Long,Map<Integer,Integer> > = Map<时间块,Map<文章id,访问量>>
  1. 什么是时间块?
  • 就是把时间切割为一块块,例如:一般文章在1小时,30分钟、5分钟的时间内产生了多少阅读量。
  1. 那如何切割时间块呢?
  • 时间戳是自 1970 年1月1日(00:00:00 GMT)至当前时间的总数,通过确定时间块大小,算出当前请求所属的时间戳从1970年算起位于第几个时间块,这个算出来的第几个时间块就是小时key即是map的key
    • 如:我们要计算“小时块”,先把当前的时间转换为为毫秒的时间戳,然后除以一个小时,即当前时间T/1000*60*60=小时key,然后用这个小时序号作为key
    • 时间戳在线转换工具
    • 2021-11-09 15:30:00 = 1636443000000毫秒
    • 小时key=1636443000000/1000*60*60=454567.5=454567(取整数,即是距离1970年开始算的第454567个时间块)
    • Map<时间块,Map<文章id,访问量>> = Map<454567,Map<文章id,访问量>>
    • 以此类推,每一次PV操作时,先计算当前时间是那个时间块,然后存储Map中。

image.png

实战:SpringBoot+Redis

步骤1:PV请求处理逻辑

    //保存时间块和pv数据的map
    public  static final  Map<Long, Map<Integer,Integer>> PV_MAP=new ConcurrentHashMap();

    /**
     * pv请求调用:
     * 即当前时间T/1000*60*60=小时key,然后用这个小时序号作为key。
     * 例如:
     * 2021-11-09 15:30:00 = 1636443000000毫秒 
     * 小时key=1636443000000/1000\*60\*60=454567.5=454567
     *
     * 每一次PV操作时,先计算当前时间是那个时间块,然后存储Map中。
     * @param id 文章id
     */
    public void addPV(Integer id) {
        //生成环境:时间块为5分钟
        //为了方便测试 改为1分钟 时间块
        int timer=1;
        long m1=System.currentTimeMillis()/(1000*60*timer);
        //拿出这个时间块的所有文章数据
        Map<Integer,Integer> mMap=Constants.PV_MAP.get(m1);
        if (CollectionUtils.isEmpty(mMap)){
            mMap=new ConcurrentHashMap();
            mMap.put(id,new Integer(1));
            //<1分钟的时间块,Map<文章Id,访问量>>
            Constants.PV_MAP.put(m1, mMap);
        }else {
            //通过文章id 取出浏览量
            Integer value=mMap.get(id);
            if (value==null){
                mMap.put(id,new Integer(1));
            }else{
                mMap.put(id,value+1);
            }
        }
    }

复制代码

步骤2:一级缓存定时器消费

定时(5分钟)从jvm的map把时间块的阅读pv取出来,然后push到reids的list数据结构中,list的存储的数据为Map<文章id,访问量PV>即每个时间块的pv数据


    /**
     * 一级缓存定时器消费调用方法:
     * 定时器,定时(5分钟)从jvm的map把时间块的阅读pv取出来,
     * 然后push到reids的list数据结构中,list的存储的书为Map<文章id,访问量PV>即每个时间块的pv数据
     */
    public void consumePV(){
        //为了方便测试 改为1分钟 时间块
        long m1=System.currentTimeMillis()/(1000*60*1);
        Iterator<Long> iterator= Constants.PV_MAP.keySet().iterator();
        while (iterator.hasNext()){
            //取出map的时间块
            Long key=iterator.next();
            //小于当前的分钟时间块key ,就消费
            if (key<m1){
                //先push
                Map<Integer,Integer> map=Constants.PV_MAP.get(key);
                //push到reids的list数据结构中,list的存储的书为Map<文章id,访问量PV>即每个时间块的pv数据
                this.redisTemplate.opsForList().leftPush(Constants.CACHE_PV_LIST,map);
                //后remove
                Constants.PV_MAP.remove(key);
                log.info("push进{}",map);
            }
        }
    }

复制代码

步骤3:二级缓存定时器消费

定时(5分钟),从redis的list数据结构pop弹出Map<文章id,访问量PV>,弹出来做了2件事:

  1. 先把Map<文章id,访问量PV>,保存到数据库
  2. 再把Map<文章id,访问量PV>,同步到redis缓存的计数器incr

    /**
     * 二级缓存定时器消费
     * 定时器,定时(5分钟),从redis的list数据结构pop弹出Map<文章id,访问量PV>,弹出来做了2件事:
     * 第一件事:先把Map<文章id,访问量PV>,保存到数据库
     * 第二件事:再把Map<文章id,访问量PV>,同步到redis缓存的计数器incr。
     */
    public boolean pop(){
        //从redis的list数据结构pop弹出Map<文章id,访问量PV>
        ListOperations<String, Map<Integer,Integer>> operations= this.redisTemplate.opsForList();
        Map<Integer,Integer> map= operations.rightPop(Constants.CACHE_PV_LIST);
        log.info("弹出pop={}",map);
        if (CollectionUtils.isEmpty(map)){
            return false;
        }
        // 第一步:先存入数据库
        // TODO: 插入数据库

        //第二步:同步redis缓存
        for (Map.Entry<Integer,Integer> entry:map.entrySet()){
//            log.info("key={},value={}",entry.getKey(),entry.getValue());
            String key=Constants.CACHE_ARTICLE+entry.getKey();
            //调用redis的increment命令
            long n=this.redisTemplate.opsForValue().increment(key,entry.getValue());
//            log.info("key={},pv={}",key, n);
        }
        return true;
    }

}
复制代码

步骤4:查看浏览量

用了一级缓存,所有的高并发流量都收集到了本地JVM,然后5分钟同步给二级缓存,从而给redis降压

    @GetMapping(value = "/view")
    public String view(Integer id) {
       //文章pv的key
        String key= Constants.CACHE_ARTICLE+id;
        //调用redis的get命令
        String n=this.stringRedisTemplate.opsForValue().get(key);
        log.info("key={},阅读量为{}",key, n);
        return n;
    }
复制代码

redis分布式缓存系列文章

上一章节: redis分布式缓存(二十九)一一 微信抢红包解决方案

  • 👍🏻:有收获的,点赞鼓励!
  • ❤️:收藏文章,方便回看!
  • 💬:评论交流,互相进步!