Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • Redis基础

  • 源码分析

  • 实战应用

    • 缓存问题&更新策略
    • 缓存最佳实践
      • 1.短信登录验证
      • 2.解决缓存问题
        • 封装工具类代码复用
      • 3.优惠券秒杀
      • 4.分布式锁优化
        • Redisson
        • 异步秒杀优化
        • Redis模拟MQ
      • 5.点赞功能
      • 6.共同关注
      • 7.用户签到
      • 8.UV统计
    • Redis面试
  • 单机缓存

  • Redis
  • 实战应用
phan
2023-05-15
目录

缓存最佳实践

# 缓存最佳实践

# 1.短信登录验证

业务流程:

  • 发送短信验证码时,将生成的验证码存入redis中
  • 登录校验时从redis取出验证码,和前端输入的数据进行比对,如果一致则根据用户号码生成用户信息,并将用户对象存入redis中,其中key设置为一个UUID的token,value为user对象序列化后的json数据。最终后端将token发送给前端,用于校验取出redis的用户数据,前端收到后每次请求都会在请求头authorization的字段。
  • 设置两个拦截器,一个用于拦截所有路径,更新redis用户信息的过期时间;另一个则用于校验登录信息,判断ThreadLocal内用户是否为空。

注意,这里拦截器的StringRedisTemplate对象不能自动注入,因为拦截器不是spring自动生成的,因此需要通过构造函数来输入对象。需要在配置类里面通过@Resource进行自动装配。

# 2.解决缓存问题

①互斥锁:线程需要等待,保证一致性。使用setnx实现互斥锁,释放锁时则删除key

  • 查redis:①有数据直接返回②有空数据直接返回(解决缓存穿透)
  • 申请锁:①如果没有获得则重新进入函数②如果获得锁,再检查一次redis有数据直接返回(二次检查),redis没有则查数据库。
  • 释放锁
public Shop queryWithmutex(Long id) {
        //1.查redis
        //2.申请锁查数据库
        String lockkey = "lock" + id;
        Shop shop = null;
        try {
            boolean b = tryLock(lockkey);
            if (!b) {
                Thread.sleep(50);
                return queryWithmutex(id);
            }
            //二次查询数据库
            String res = stringRedisTemplate.opsForValue().get(id.toString());
            if (StrUtil.isNotBlank(res)) {
                return JSONUtil.toBean(res, Shop.class);
            }
            shop = getById(id);
            Thread.sleep(200);
            if (shop == null) {
                //解决缓存穿透
                stringRedisTemplate.opsForValue().set(id.toString(), "",2,TimeUnit.MINUTES);
                return null;
            }
            stringRedisTemplate.opsForValue().set(id.toString(), JSONUtil.toJsonStr(shop), 30, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            unlock(lockkey);
        }
        return shop;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

②逻辑过期:线程无需等待,性能好。不保证一致性。(互斥锁+线程池)

image-20230312190116955

# 封装工具类代码复用

public <R,ID> R queryWithPassThrough(
   String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
        String key = keyPrefix + id;
        // 1.从redis查询商铺缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2.判断是否存在
        if (StrUtil.isNotBlank(json)) {
            // 3.存在,直接返回
            return JSONUtil.toBean(json, type);
        }
        // 判断命中的是否是空值
        if (json != null) {
            // 返回一个错误信息
            return null;
        }
        // 4.不存在,根据id查询数据库
        R r = dbFallback.apply(id);
        // 5.不存在,返回错误
        if (r == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回错误信息
            return null;
        }
        // 6.存在,写入redis
        this.set(key, r, time, unit);
        return r;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 3.优惠券秒杀

订单表ID自增存在的问题:

  • id的规律性太明显,用户容易猜到一天卖了多少单
  • 受到数据量限制

基于Redis自增生成全局唯一ID:时间戳+自增id计数器

超卖问题:①查询库存②库存减一。由于访问临界资源时线程不互斥,因此会导致多个线程同时争夺同一个资源情况。

  • 悲观锁:操作数据之前先获取锁,确保线程串行执行。

  • 乐观锁:只有在更新数据时才去判断有没有其它线程对数据做了修改。库存减一后,版本号进行加一。

    缺点:同一时刻进入大量请求,那么最终只能有一个线程能够修改库存成功(其它线程CAS发现库存和查找时的结果不一致,更新失败),失败率提高。

    采用库存是否大于0判断代替CAS,条件没有那么严格。

boolean res = seckillVoucherService.update().setSql("stock=stock-1")
                .eq("voucher_id", voucherId).gt("stock", 0).update();
1
2

# 4.分布式锁优化

一人一单场景:防止一个用户抢购多个秒杀券。因此需要在数据库减库存之前,在订单表中判断是否有当前user_id和voucher_id的购买记录。没有再进行库存减一。

  • 问题:先查后改存在并发问题。

解决方案:对用户id添加synchronized锁,降低锁的粒度。此外由于toString函数每次都会new一个新的对象,因此需要使用intern根据常量池的变量进行判断。并且在整个事务提交完成后才能释放锁。执行事务时需要使用AopContext.currentProxy()来获取代理类,只有获取事务有关的代理对象,事务才能生效(spring内部实现事务本质是通过代理类),否则通过this调用没有事务功能。

 synchronized(userId.toString().intern()) {
     IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
     return proxy.createVoucherOrder(voucherId);
 }
@Transactional
    public  Result createVoucherOrder(Long voucherId) {}
1
2
3
4
5
6
  • 问题:synchronized时jvm层面的锁,当在微服务集群模式下,同一个用户依旧可以在多个微服务节点上进行下多个秒杀券。

分布式锁:setnx+ex指定过期时间,防止redis宕机锁不能被释放,从而整个服务不可用。key可以设置为用户id+秒杀券id,value设置为线程id。

public boolean tryLock(long timeoutSec) {
    long id = Thread.currentThread().getId();
    Boolean success=stringRedisTemplate.opsForValue().
            setIfAbsent(KEY_PREFIX+name, id+"", timeoutSec, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
}
public void unlock() {
    stringRedisTemplate.delete(KEY_PREFIX+name);
}
1
2
3
4
5
6
7
8
9
  • 问题:线程1被阻塞,唤醒后释放线程2的锁

释放锁时,当前线程判断redis里面存放的锁是否是自己的当前线程的锁。其中key设置为用户id,value设置为UUID+线程id。

private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
public boolean tryLock(long timeoutSec) {
    String id = ID_PREFIX+Thread.currentThread().getId();
    Boolean success=stringRedisTemplate.opsForValue().
            setIfAbsent(KEY_PREFIX+name, id, timeoutSec, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
}
public void unlock() {
    String mylockid = ID_PREFIX+Thread.currentThread().getId();
    String currlock = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    if (mylockid.equals(currlock)) {
        stringRedisTemplate.delete(KEY_PREFIX+name);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# Redisson

可重入锁:一个线程多次想要获取锁。Redisson使用Hash结构代替了String结构,当第二次想要获取锁时,只需对应的field的value加一。

image-20230313143735959

可重试锁:获取锁失败后,再次尝试重新获取。

# 异步秒杀优化

  • 秒杀库存信息以及当前用户购买订单信息添加到redis中
  • redis使用lua脚本判断库存充足,一人一单(目的保证原子性)
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列。开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能,添加数据库。

线程池初始化(@PostConstruct对象加载构造完执行)

private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService seckill_order_handler = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
    seckill_order_handler.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                VoucherOrder order = orderTasks.take();
                //数据库实现下单业务
                handleVoucherOrder(order);
            } catch (InterruptedException e) {
                log.error("处理订单");
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

整体业务流程

public Result seckillVoucher(Long voucherId) {
    Long id = UserHolder.getUser().getId();
    //执行lua脚本,结果为0则表示具有购买资格
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), id.toString());
    int r = result.intValue();
    if (r != 0) {
        return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
    }
    VoucherOrder voucherOrder = new VoucherOrder();
    long order = redisIdWorker.nextId("order");
    voucherOrder.setId(order);
    voucherOrder.setUserId(UserHolder.getUser().getId());
    voucherOrder.setVoucherId(voucherId);
    //添加进阻塞队列
    orderTasks.add(voucherOrder);
    proxy = (IVoucherOrderService)AopContext.currentProxy();
    return Result.ok(order);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# Redis模拟MQ

1.基于list队列BRPOP,LPUSH

2.PubSub,支持发布订阅,多生产多消费

3.stream

# 5.点赞功能

  • 场景:同一个用户只能点赞一次,再次点击则取消点赞。当前用户已经点赞,则点赞按钮高亮显示。

解决:使用Set集合来存储每个博客的点赞信息,其中key为博客id,成员value为每个点过赞用户id。

注意由于Zset没有判空函数,因此这里采用获取成员score函数来代替此功能,根据能否读出这个分数来判断当前用户是否点赞。

public Result likeBlog(Long id) {
    Long userid = UserHolder.getUser().getId();
    String key = "blog:liked:" + id;
    Double score = stringRedisTemplate.opsForZSet().score(key, userid.toString());
    if (score==null) {
        boolean id1 = update().setSql("liked=liked+1").eq("id", id).update();
        if (id1) {
            stringRedisTemplate.opsForZSet().add(key, userid.toString(),System.currentTimeMillis());
        }
    } else {
        boolean id1 = update().setSql("liked=liked-1").eq("id", id).update();
        if (id1) {
            stringRedisTemplate.opsForZSet().remove(key, userid.toString());
        }

    }
    return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  • 场景:按照点赞时间先后显示前五名用户

解决:使用Zset,并使用当前时间戳作为分数,添加进redis的用户越早时间戳越小。

注意这里如果只是in字段,那么select语句结果不能保证按照时间戳顺序,因此需要添加order by field字段,保证按照从set集合中取出来的id顺序。

public Result queryBlogLikes(Long id) {
    String key = BLOG_LIKED_KEY + id;
    Set<String> set = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    if (set == null || set.isEmpty()) {
        return Result.ok(Collections.emptyList());
    }
    List<Long> ids = set.stream().map(Long::valueOf).collect(Collectors.toList());
    String idstr = StrUtil.join(",", ids);
    List<UserDTO> userDTO = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+idstr+")").list()
            .stream().map(user-> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());

    return Result.ok(userDTO);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 6.共同关注

  • 场景:查询用户和当前用户的共同关注

解决:使用Set集合的求交集功能。需要用redis保存用户关注列表,key是用户id,value是关注所有用户id。

public Result followCommons(Long id) {
    Long id1 = UserHolder.getUser().getId();
    String key = "follows:" + id1;
    String key1= "follows:" + id;
    Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key1);
    if (intersect == null || intersect.isEmpty()) {
        return Result.ok(Collections.emptyList());
    }
    List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
    List<UserDTO> collect = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
            .collect(Collectors.toList());
    return Result.ok(collect);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  • 场景:关注推送,Feed流

解决:①每次添加更新博客文章时,向数据库插入文章。②从数据库中找出当前用户的粉丝(也就是关注当前用户的所有用户)③然后往Zset中存放推送文章数据,key为每个粉丝的用户id,每个集合成员为添加的博客id,score为当前时间戳。

添加博客-消息推送

public Result saveBlog(Blog blog) {
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());
    boolean issuccess = save(blog);
    if (!issuccess) {
        return Result.fail("新增笔记失败");
    }
    List<Follow> follows =followService.query().eq("follow_user_id", user.getId()).list();
    for (Follow follow : follows) {
        Long userId = follow.getUserId();
        String key = "feed:" + userId;
        stringRedisTemplate.opsForZSet().add(key, blog.getId().toString() , System.currentTimeMillis());
    }
    return Result.ok(blog.getId());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Zset滚动分页查询(max min offset count):在得分在max-min范围内,从max偏移offset个数据获取count个数据。

public Result queryBlogOfFollow(Long lastId, Integer offset) {
    Long userId = UserHolder.getUser().getId();
    String key = FEED_KEY + userId;
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, lastId, offset, 2);
    if (typedTuples == null || typedTuples.isEmpty()) {
        return Result.ok();
    }
    List<Long> ids = new ArrayList<>(typedTuples.size());
    long minTime=0;
    int os=1;
    for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
        ids.add(Long.valueOf(tuple.getValue()));
        long time = tuple.getScore().longValue();
        if (time == minTime) {
            os++;
        } else {
            minTime = time;
            os = 1;
        }
    }
    String idstr = StrUtil.join(",", ids);
    List<Blog> list = query().in("id", ids).last("ORDER BY FIELD(id," + idstr + ")").list();
    for (Blog blog : list) {
        queryBlogUser(blog);
        isBlogLiked(blog);
    }
    ScrollResult r = new ScrollResult();
    r.setList(list);
    r.setOffset(os);
    r.setMinTime(minTime);
    return Result.ok(r);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 7.用户签到

  • 场景:使用数据库表存储签到表的数据量庞大

解决:使用位图来记录签到状态,一个bit对应一天的签到记录。

  • 场景:签到统计,连续天数

解决:取出当月的签到记录(十进制),然后统计连续签到天数

public Result signCount() {
    Long id = UserHolder.getUser().getId();
    LocalDateTime now = LocalDateTime.now();
    String key = "sign:" + id + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
    int dayOfMonth = now.getDayOfMonth();
    List<Long> result = stringRedisTemplate.opsForValue().bitField(
            key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
    );
    if (result == null || result.isEmpty()) {
        return Result.ok(0);
    }
    Long num = result.get(0);
    if (num == null || num == 0) {
        return Result.ok(0);
    }
    int count = 0;
    while (true) {
        if ((num & 1) == 0) {
            break;
        } else {
            count++;
        }
        num >>>= 1;
    }
    return Result.ok(count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 8.UV统计

UV:独立访客量,统计浏览网页的用户数量,一天内同个用户访问多次只记录一次

PV:页面访问量,用户每访问一次,就统计一次。

解决:使用HyperLogLog

  void testHyperLogLog() {
        String userkey="user_"+UserHolder.getUser().getId();
        stringRedisTemplate.opsForHyperLogLog().add("uv", userkey);
        // 统计数量
        Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
}
1
2
3
4
5
6
编辑 (opens new window)
#Redis
上次更新: 2023/12/15, 15:49:57
缓存问题&更新策略
Redis面试

← 缓存问题&更新策略 Redis面试→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式