缓存最佳实践
# 缓存最佳实践
# 1.短信登录验证
业务流程:
- 发送短信验证码时,将生成的验证码存入redis中
- 登录校验时从redis取出验证码,和前端输入的数据进行比对,如果一致则根据用户号码生成用户信息,并将用户对象存入redis中,其中key设置为一个UUID的token,value为user对象序列化后的json数据。最终后端将token发送给前端,用于校验取出redis的用户数据,前端收到后每次请求都会在请求头authorization的字段。
- 设置两个拦截器,一个用于拦截所有路径,更新redis用户信息的过期时间;另一个则用于校验登录信息,判断ThreadLocal内用户是否为空。
注意,这里拦截器的StringRedisTemplate对象不能自动注入,因为拦截器不是spring自动生成的,因此需要通过构造函数来输入对象。需要在配置类里面通过@Resource进行自动装配。
# 2.解决缓存问题
①互斥锁:线程需要等待,保证一致性。使用setnx实现互斥锁,释放锁时则删除key
- 查redis:①有数据直接返回②有空数据直接返回(解决缓存穿透)
- 申请锁:①如果没有获得则重新进入函数②如果获得锁,再检查一次redis有数据直接返回(二次检查),redis没有则查数据库。
- 释放锁
public Shop queryWithmutex(Long id) {
//1.查redis
//2.申请锁查数据库
String lockkey = "lock" + id;
Shop shop = null;
try {
boolean b = tryLock(lockkey);
if (!b) {
Thread.sleep(50);
return queryWithmutex(id);
}
//二次查询数据库
String res = stringRedisTemplate.opsForValue().get(id.toString());
if (StrUtil.isNotBlank(res)) {
return JSONUtil.toBean(res, Shop.class);
}
shop = getById(id);
Thread.sleep(200);
if (shop == null) {
//解决缓存穿透
stringRedisTemplate.opsForValue().set(id.toString(), "",2,TimeUnit.MINUTES);
return null;
}
stringRedisTemplate.opsForValue().set(id.toString(), JSONUtil.toJsonStr(shop), 30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
unlock(lockkey);
}
return shop;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
②逻辑过期:线程无需等待,性能好。不保证一致性。(互斥锁+线程池)

# 封装工具类代码复用
public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(json)) {
// 3.存在,直接返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值
if (json != null) {
// 返回一个错误信息
return null;
}
// 4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
return r;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 3.优惠券秒杀
订单表ID自增存在的问题:
- id的规律性太明显,用户容易猜到一天卖了多少单
- 受到数据量限制
基于Redis自增生成全局唯一ID:时间戳+自增id计数器
超卖问题:①查询库存②库存减一。由于访问临界资源时线程不互斥,因此会导致多个线程同时争夺同一个资源情况。
悲观锁:操作数据之前先获取锁,确保线程串行执行。
乐观锁:只有在更新数据时才去判断有没有其它线程对数据做了修改。库存减一后,版本号进行加一。
缺点:同一时刻进入大量请求,那么最终只能有一个线程能够修改库存成功(其它线程CAS发现库存和查找时的结果不一致,更新失败),失败率提高。
采用库存是否大于0判断代替CAS,条件没有那么严格。
boolean res = seckillVoucherService.update().setSql("stock=stock-1")
.eq("voucher_id", voucherId).gt("stock", 0).update();
2
# 4.分布式锁优化
一人一单场景:防止一个用户抢购多个秒杀券。因此需要在数据库减库存之前,在订单表中判断是否有当前user_id和voucher_id的购买记录。没有再进行库存减一。
- 问题:先查后改存在并发问题。
解决方案:对用户id添加synchronized锁,降低锁的粒度。此外由于toString函数每次都会new一个新的对象,因此需要使用intern根据常量池的变量进行判断。并且在整个事务提交完成后才能释放锁。执行事务时需要使用AopContext.currentProxy()来获取代理类,只有获取事务有关的代理对象,事务才能生效(spring内部实现事务本质是通过代理类),否则通过this调用没有事务功能。
synchronized(userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
@Transactional
public Result createVoucherOrder(Long voucherId) {}
2
3
4
5
6
- 问题:synchronized时jvm层面的锁,当在微服务集群模式下,同一个用户依旧可以在多个微服务节点上进行下多个秒杀券。
分布式锁:setnx+ex指定过期时间,防止redis宕机锁不能被释放,从而整个服务不可用。key可以设置为用户id+秒杀券id,value设置为线程id。
public boolean tryLock(long timeoutSec) {
long id = Thread.currentThread().getId();
Boolean success=stringRedisTemplate.opsForValue().
setIfAbsent(KEY_PREFIX+name, id+"", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
2
3
4
5
6
7
8
9
- 问题:线程1被阻塞,唤醒后释放线程2的锁
释放锁时,当前线程判断redis里面存放的锁是否是自己的当前线程的锁。其中key设置为用户id,value设置为UUID+线程id。
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
public boolean tryLock(long timeoutSec) {
String id = ID_PREFIX+Thread.currentThread().getId();
Boolean success=stringRedisTemplate.opsForValue().
setIfAbsent(KEY_PREFIX+name, id, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
public void unlock() {
String mylockid = ID_PREFIX+Thread.currentThread().getId();
String currlock = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if (mylockid.equals(currlock)) {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Redisson
可重入锁:一个线程多次想要获取锁。Redisson使用Hash结构代替了String结构,当第二次想要获取锁时,只需对应的field的value加一。

可重试锁:获取锁失败后,再次尝试重新获取。
# 异步秒杀优化
- 秒杀库存信息以及当前用户购买订单信息添加到redis中
- redis使用lua脚本判断库存充足,一人一单(目的保证原子性)
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列。开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能,添加数据库。
线程池初始化(@PostConstruct对象加载构造完执行)
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService seckill_order_handler = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
seckill_order_handler.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
VoucherOrder order = orderTasks.take();
//数据库实现下单业务
handleVoucherOrder(order);
} catch (InterruptedException e) {
log.error("处理订单");
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
整体业务流程
public Result seckillVoucher(Long voucherId) {
Long id = UserHolder.getUser().getId();
//执行lua脚本,结果为0则表示具有购买资格
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), id.toString());
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
VoucherOrder voucherOrder = new VoucherOrder();
long order = redisIdWorker.nextId("order");
voucherOrder.setId(order);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//添加进阻塞队列
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService)AopContext.currentProxy();
return Result.ok(order);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Redis模拟MQ
1.基于list队列BRPOP,LPUSH
2.PubSub,支持发布订阅,多生产多消费
3.stream
# 5.点赞功能
- 场景:同一个用户只能点赞一次,再次点击则取消点赞。当前用户已经点赞,则点赞按钮高亮显示。
解决:使用Set集合来存储每个博客的点赞信息,其中key为博客id,成员value为每个点过赞用户id。
注意由于Zset没有判空函数,因此这里采用获取成员score函数来代替此功能,根据能否读出这个分数来判断当前用户是否点赞。
public Result likeBlog(Long id) {
Long userid = UserHolder.getUser().getId();
String key = "blog:liked:" + id;
Double score = stringRedisTemplate.opsForZSet().score(key, userid.toString());
if (score==null) {
boolean id1 = update().setSql("liked=liked+1").eq("id", id).update();
if (id1) {
stringRedisTemplate.opsForZSet().add(key, userid.toString(),System.currentTimeMillis());
}
} else {
boolean id1 = update().setSql("liked=liked-1").eq("id", id).update();
if (id1) {
stringRedisTemplate.opsForZSet().remove(key, userid.toString());
}
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 场景:按照点赞时间先后显示前五名用户
解决:使用Zset,并使用当前时间戳作为分数,添加进redis的用户越早时间戳越小。
注意这里如果只是in字段,那么select语句结果不能保证按照时间戳顺序,因此需要添加order by field字段,保证按照从set集合中取出来的id顺序。
public Result queryBlogLikes(Long id) {
String key = BLOG_LIKED_KEY + id;
Set<String> set = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (set == null || set.isEmpty()) {
return Result.ok(Collections.emptyList());
}
List<Long> ids = set.stream().map(Long::valueOf).collect(Collectors.toList());
String idstr = StrUtil.join(",", ids);
List<UserDTO> userDTO = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+idstr+")").list()
.stream().map(user-> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(userDTO);
}
2
3
4
5
6
7
8
9
10
11
12
13
# 6.共同关注
- 场景:查询用户和当前用户的共同关注
解决:使用Set集合的求交集功能。需要用redis保存用户关注列表,key是用户id,value是关注所有用户id。
public Result followCommons(Long id) {
Long id1 = UserHolder.getUser().getId();
String key = "follows:" + id1;
String key1= "follows:" + id;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key1);
if (intersect == null || intersect.isEmpty()) {
return Result.ok(Collections.emptyList());
}
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
List<UserDTO> collect = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(collect);
}
2
3
4
5
6
7
8
9
10
11
12
13
- 场景:关注推送,Feed流
解决:①每次添加更新博客文章时,向数据库插入文章。②从数据库中找出当前用户的粉丝(也就是关注当前用户的所有用户)③然后往Zset中存放推送文章数据,key为每个粉丝的用户id,每个集合成员为添加的博客id,score为当前时间戳。
添加博客-消息推送
public Result saveBlog(Blog blog) {
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
boolean issuccess = save(blog);
if (!issuccess) {
return Result.fail("新增笔记失败");
}
List<Follow> follows =followService.query().eq("follow_user_id", user.getId()).list();
for (Follow follow : follows) {
Long userId = follow.getUserId();
String key = "feed:" + userId;
stringRedisTemplate.opsForZSet().add(key, blog.getId().toString() , System.currentTimeMillis());
}
return Result.ok(blog.getId());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Zset滚动分页查询(max min offset count):在得分在max-min范围内,从max偏移offset个数据获取count个数据。
public Result queryBlogOfFollow(Long lastId, Integer offset) {
Long userId = UserHolder.getUser().getId();
String key = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, lastId, offset, 2);
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok();
}
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime=0;
int os=1;
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
ids.add(Long.valueOf(tuple.getValue()));
long time = tuple.getScore().longValue();
if (time == minTime) {
os++;
} else {
minTime = time;
os = 1;
}
}
String idstr = StrUtil.join(",", ids);
List<Blog> list = query().in("id", ids).last("ORDER BY FIELD(id," + idstr + ")").list();
for (Blog blog : list) {
queryBlogUser(blog);
isBlogLiked(blog);
}
ScrollResult r = new ScrollResult();
r.setList(list);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 7.用户签到
- 场景:使用数据库表存储签到表的数据量庞大
解决:使用位图来记录签到状态,一个bit对应一天的签到记录。
- 场景:签到统计,连续天数
解决:取出当月的签到记录(十进制),然后统计连续签到天数
public Result signCount() {
Long id = UserHolder.getUser().getId();
LocalDateTime now = LocalDateTime.now();
String key = "sign:" + id + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
int dayOfMonth = now.getDayOfMonth();
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (result == null || result.isEmpty()) {
return Result.ok(0);
}
Long num = result.get(0);
if (num == null || num == 0) {
return Result.ok(0);
}
int count = 0;
while (true) {
if ((num & 1) == 0) {
break;
} else {
count++;
}
num >>>= 1;
}
return Result.ok(count);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 8.UV统计
UV:独立访客量,统计浏览网页的用户数量,一天内同个用户访问多次只记录一次
PV:页面访问量,用户每访问一次,就统计一次。
解决:使用HyperLogLog
void testHyperLogLog() {
String userkey="user_"+UserHolder.getUser().getId();
stringRedisTemplate.opsForHyperLogLog().add("uv", userkey);
// 统计数量
Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
}
2
3
4
5
6