18509 字
93 分钟
Redis - 实战

短信登录#

基于Session实现登录的流程#

发送验证码:

用户在提交手机号后,会校验手机号是否合法,如果不合法,则要求用户重新输入手机号。

如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户

短信验证码登录、注册:

​ 用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息

校验登录状态:

用户在请求时候,会从cookie中携带JsessionId到后台,后台通过JsessionId从session中拿到用户信息,如果没有session信息,则进行拦截,如果有session信息,则将用户信息保存到threadLocal中,并且放行。

发送验证码

@Service
@Slf4j
public class UserInfoServiceImpl extends ServiceImpl<UserInfoMapper, UserInfo> implements IUserInfoService {
@Override
public Result sendCode(String phone, HttpSession session) {
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误");
}
// 生成验证码
String code = RandomUtil.randomNumbers(6);
// 保存验证码
session.setAttribute("code", code);
// 发送验证码
log.info("发送的验证码:{}", code);
return Result.ok();
}
}

登录

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
String code = loginForm.getCode();
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误");
}
if (RegexUtils.isCodeInvalid(code)) {
return Result.fail("手机号格式错误");
}
// 根据手机查用户
Object cacheCode = session.getAttribute("code");
if (cacheCode == null) {
return Result.fail("请填写验证码");
}
if (!cacheCode.equals(code)) {
return Result.fail("验证码错误");
}
// 用户是否存在
User user = lambdaQuery().eq(User::getPhone, phone).one();
// 不存在,注册
if (user == null) {
user = User.builder()
.phone(phone)
.nickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(6))
.build();
save(user);
}
// 存在,写到session
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
return Result.ok();
}
TIP

这里不需要返回登录凭证,session的原理是基于cookie,每一个session都会有一个唯一的sessionID,在访问tomcat时,这个session就已经写到cookie当中了,以后的每次请求都会带着sessionID

登录验证功能

public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
HttpSession session = request.getSession();
Object user = session.getAttribute("user");
if (ObjectUtil.isEmpty(user)) {
response.setStatus(401);
return false;
}
// 保存threadLocal
UserHolder.saveUser((UserDTO) user);
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
HandlerInterceptor.super.postHandle(request, response, handler, modelAndView);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shot-type/**",
"/voucher/**"
);
}
}

集群的Session共享问题#

session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。

​ 每个tomcat中都有一份属于自己的session,假设用户第一次访问第一台tomcat,并且把自己的信息存放到第一台服务器的session中,但是第二次这个用户访问到了第二台tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的session,所以此时 整个登录拦截功能就会出现问题。早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了

但是这种方案具有两个大问题

1、每台服务器中都有完整的一份session数据,内存空间浪费。

2、session拷贝数据时,可能会出现延迟,会出现数据不一致情况。

session的替代方案应该满足:数据共享、内存存储,key、value结构。基于以上特性可以选择使用Redis代替Session。

基于Reids实现共享Session的登录#

验证码保存:由于验证码只是简单的数字,故用String类型存储即可 后续用户要提交手机号和收到的验证码进行验证,可以将key 设置为”phone:手机号”的形式,既方便读取Redis中的验证 码,又保证了每个登录用户key的唯一性

用户保存:使用Hash结构将用户对象保存到Redis 以随机token为key存储用户数据,保证key的唯一性

发送验证码

private final RedisTemplate<String, Object> redisTemplate;
@Override
public Result sendCode(String phone, HttpSession session) {
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误");
}
// 生成验证码
String code = RandomUtil.randomNumbers(6);
// 保存到redis
redisTemplate.opsForValue().set(RedisConstants.LOGIN_CODE_KEY + phone, code, RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);
// 发送验证码
log.info("发送的验证码:{}", code);
return Result.ok();
}

登录

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
String code = loginForm.getCode();
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误");
}
if (RegexUtils.isCodeInvalid(code)) {
return Result.fail("手机号格式错误");
}
// 根据手机查用户
String cacheCode = (String) redisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone);
if (cacheCode == null) {
return Result.fail("请填写验证码");
}
if (!cacheCode.equals(code)) {
return Result.fail("验证码错误");
}
// 用户是否存在
User user = lambdaQuery().eq(User::getPhone, phone).one();
// 不存在,注册
if (user == null) {
user = User.builder()
.phone(phone)
.nickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(6))
.build();
save(user);
}
// 生成token (可换jwt)
String token = UUID.randomUUID().toString(true);
// 写到redis
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);
String tokenKey = RedisConstants.LOGIN_USER_KEY + token;
redisTemplate.opsForHash().putAll(tokenKey, userMap);
redisTemplate.expire(tokenKey, 30, TimeUnit.MINUTES);
// 返回token
return Result.ok(token);
}

拦截器

@RequiredArgsConstructor
public class LoginInterceptor implements HandlerInterceptor {
private final StringRedisTemplate stringRedisTemplate;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
response.setStatus(401);
return false;
}
// 基于ToKEN获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
if (userMap.isEmpty()) {
response.setStatus(401);
return false;
}
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 刷新token有效期 (token续期)
stringRedisTemplate.expire(LOGIN_USER_KEY + token, 30, TimeUnit.MINUTES);
// 保存threadLocal
UserHolder.saveUser(userDTO);
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
HandlerInterceptor.super.postHandle(request, response, handler, modelAndView);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
@Configuration
@RequiredArgsConstructor
public class MvcConfig implements WebMvcConfigurer {
private final StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shot-type/**",
"/voucher/**"
);
}
}

优化拦截器

当前拦截器有个问题,如果用户访问的一直都是放行的路径,那么就不会进入到拦截器中,所以拦截器中的token续期就无效了,所以我们需要再加一个拦截器(拦截所有路径的)

@RequiredArgsConstructor
public class RefreshInterceptor implements HandlerInterceptor {
private final StringRedisTemplate stringRedisTemplate;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}
// 基于ToKEN获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
if (userMap.isEmpty()) {
return true;
}
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 刷新token有效期 (token续期)
stringRedisTemplate.expire(LOGIN_USER_KEY + token, 30, TimeUnit.MINUTES);
// 保存threadLocal
UserHolder.saveUser(userDTO);
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
HandlerInterceptor.super.postHandle(request, response, handler, modelAndView);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
UserDTO user = UserHolder.getUser();
if (user == null) {
response.setStatus(401);
return false;
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
HandlerInterceptor.super.postHandle(request, response, handler, modelAndView);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}
@Configuration
@RequiredArgsConstructor
public class MvcConfig implements WebMvcConfigurer {
private final StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shot-type/**",
"/voucher/**"
).order(1);
registry.addInterceptor(new RefreshInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0); // 先执行
}
}

商户查询缓存#

缓存#

缓存就是数据交换的缓冲区(cache),是存贮数据的临时地方,一般读写性能较高

缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘,缓存可以大大降低用户访问并发量带来的服务器读写压力,降低响应时间

实际开发中,会构筑多级缓存来使系统运行速度进一步提升,例如:本地缓存与redis中的缓存并发使用

缓存的成本:

  • 数据一致性成本(需要保证缓存与数据库中数据的一致性)
  • 代码维护成本
  • 运维成本(为保证缓存高可用,需要搭建缓存集群,增加运维成本)

添加Redis缓存#

总体流程

业务流程

操作思路:查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis并将数据返回。

key设计为 “固定前缀+商铺id”的形式。

private final RedisTemplate<String, Object> redisTemplate;
@Override
public Result queryShopById(Long id) {
String shopJson = (String) redisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSON.parseObject(shopJson, Shop.class);
return Result.ok(shop);
}
Shop shop = getById(id);
if (ObjectUtil.isEmpty(shop)) {
return Result.ok();
}
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSON.toJSONString(shop));
return Result.ok(shop);
}

首页查询做缓存:

List做缓存

private final RedisTemplate<String, Object> redisTemplate;
@Override
public Result queryTypeList() {
List<Object> shopTypeListCache = redisTemplate.opsForList().range("cache:shopType", 0L, -1L);
if (!shopTypeListCache.isEmpty()) {
return Result.ok(shopTypeListCache);
}
List<ShopType> typeList = lambdaQuery()
.orderByAsc(ShopType::getSort)
.list();
if (typeList.isEmpty()) {
return Result.ok();
}
redisTemplate.opsForList().rightPushAll("cache:shopType", typeList.toArray());
return Result.ok(typeList);
}

typeList.toArray() 变成 ShopType[] 数组,rightPushAll(ShopType[] array) 会把数组中的每一个元素逐个写入 Redis;

如果是redisTemplate.opsForList().rightPushAll("cache:shopType", typeList);

rightPushAll(Collection<?> c) 会把整个集合当作一个元素推入 Redis 列表,得到的将会是(这样是不对的)

[
[ShopType1, ShopType2, ShopType3 ...]
]

缓存更新策略#

内存淘汰超时剔除主动更新
说明不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。编写业务逻辑,在修改数据库的同时,更新缓存。
一致性一般
维护成本
淘汰哪部分数据和淘汰的时机无法确定,如果旧数据一直为被淘汰,会造成数据的不一致一致性的强弱取决于所设置TTL的长短,同时如果在所设置的更新时间内发生数据更新,还是会造成数据的不一致更新可控性高,但需要编写额外业务逻辑

可以根据业务场景选择更新策略:

  • 低一致性需求:使用内存淘汰机制。如店铺类型的查询缓存
  • 高一致性需求:主动更新,并以超时剔除为兜底方案。如店铺详情查询的缓存。

主动更新策略主要有三种:Cache Aside模式、Read/Write Through模式、Write Behind Cahing模式。

  • Cache Aside(缓存旁路模式):由缓存调用者,在更新数据库的同时更新缓存。(代码复杂,但可人为控制)
  • Read/Write Through(读写穿透模式):缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题。(维护服务复杂,无现成服务)
  • Write Behind Caching(写回模式):调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致。(维护异步任务复杂,在异步进程修改数据库前,难以保证一致性,若服务器宕机,内存中的Redis数据将丢失 )

综合考虑,在企业中使用最多的策略是:Cache Aside。由调用者自己更新缓存。


操作缓存和数据库时有三个问题需要考虑:

  1. 删除缓存还是更新缓存?
  • 更新缓存:每次更新数据库都要更新缓存,无效的写操作多。
  • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存。

删除缓存,意思就是说,更新数据库以后直接将缓存中的旧数据直接删除了,等下一次查询在往缓存中存储,这种方式是更加合适的。

如果是更新缓存,假如我们往数据库进行了100次更新,那么redis就需要进行100次更新,如果这100次期间并没有人来访问,就会造成写多读少的问题。而删除缓存,则是直接删除缓存,等什么时候有人来访问,再来写入缓存,这样就不会造成写多读少的问题。


  1. 如何保证缓存与数据库的操作的同时成功或失败?
  • 单体系统:将缓存和数据库操作放在一个事务中。
  • 分布式系统:利用TCC等分布式事务方案(如:使用MQ通知其他服务进行数据同步)。

  1. 先操作缓存还是先操作数据库?(线程安全问题)

先删除缓存,再操作数据库。在多线程下可能会出现如下情况:

  • 线程1执行写操作,首先删除缓存,准备更新数据库(耗时较长)
  • 线程2查询数据,缓存未命中,读取数据库旧值并回填缓存。
  • 线程1完成数据库更新
  • 后序查询请求都会命中过期的缓存

从清空缓存到更新完数据库,整个过程耗时较长,其他线程很有可能在此期间读到数据库中的旧数据并写入缓存。缓存中存在旧数据,后续请求持续读到旧值,直到缓存过期或主动删除。

正常情况:

异常情况:

先操作数据库,再删除缓存。此时可能出现线程安全的情况如下:

  • 线程1执行写操作,先更新数据库,此时尚未删除缓存
  • 线程2查询数据,命中缓存中的旧数据,返回。
  • 线程1删除缓存
  • 后续请求查询缓存未命中,从数据库读取新值并回填缓存。

这种方法会造成短暂的数据不一致,但缓存删除后数据恢复一致。后续请求缓存新值,无长期问题。

正常情况:

异常情况:

还有另一种可能的情况:

  • 由于缓存过期或者首次查询,线程1查询缓存未命中,开始读取数据库v=10。
  • 在线程1读取数据库的过程中,线程2更新数据库为v=20,并删除缓存。
  • 线程2的删除缓存操作完成。
  • 线程1将读取到的旧数据v=10写入缓存。

但是写入Redis缓存的用时很短,不太可能在此期间完成更新数据库和删除缓存的可能,发生数据不一致的可能很小。

综合来看,方案二出现不一致性问题概率更低;


缓存更新策略的最佳实践方案:

1、低一致性需求:使用Redis自带的内存淘汰机制

2、高一致性需求:主动更新(Cache Aside),并以超时剔除作为兜底方案

  • 读操作:

    • 缓存命中则直接返回
    • 缓存未命中则查询数据库,并写入缓存,设定超时时间
  • 写操作:

    • 先写数据库,然后再删除缓存
    • 要确保数据库与缓存操作的原子性

实现商铺缓存#

查询这里增加时间

redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSON.toJSONString(shop), 30L, TimeUnit.MINUTES);

更新商品逻辑:

@Override
@Transactional(rollbackFor = Exception.class)
public Result updateShop(Shop shop) {
Long id = shop.getId();
if (id == null) {
return Result.fail("id不能为空");
}
// 更新数据库
updateById(shop);
// 删除缓存
redisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id);
return Result.ok();
}

缓存穿透#

缓存穿透指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

常见解决方案有两种:

  1. 缓存空对象

当我们发现请求的数据即不存在于缓存,也不存于与数据库时,将空值缓存到Redis,并设置过期时间,避免频繁查询数据库。

优点:

  • 实现简单,维护⽅便

缺点:

  • 额外的内存消耗;可能发生不一致问题(在TTL内真的有对应数据存入数据库中)

假如用户刚好请求了一个id,但是这个id的数据不存在,我们给缓存了个null,就在此时我们真的给这个id插入了一条数据,但是缓存中缓存的是null,出现了数据不一致的问题。

我们可以在新增数据的时候,我们主动把redis中的数据进行覆盖掉,也可以解决这个问题。

  1. 布隆过滤

  • 优点:内存占用较少,没有多余key
  • 缺点:
    • 实现复杂
    • 存在误判可能

解决缓存穿透问题

private final RedisTemplate<String, Object> redisTemplate;
@Override
public Result queryShopById(Long id) {
String shopJson = (String) redisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSON.parseObject(shopJson, Shop.class);
return Result.ok(shop);
}
// 命中的是否是空值
if (shopJson != null) {
return Result.fail("店铺不存在");
}
Shop shop = getById(id);
if (ObjectUtil.isEmpty(shop)) {
// 缓存空值到redis
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, "", 2L, TimeUnit.MINUTES);
return Result.fail("店铺不存在");
}
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSON.toJSONString(shop), 30L, TimeUnit.MINUTES);
return Result.ok(shop);
}

总结

缓存穿透产生的原因是什么?

  • 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

  • 缓存 null 值
  • 布隆过滤
  • 增强 id 的复杂度,避免被猜测 id 规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

缓存雪崩#

缓存雪崩是指在同⼀时段 ⼤量的缓存key同时失效 或者 Redis服务宕机,导致⼤量请求到达数据库,带来巨⼤压⼒。

与缓存击穿的区别:雪崩是很多key,击穿是某一个key缓存。

常见的解决方案有:

  • 由于设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效。因此给不同的Key在原本TTL的基础上添加随机值,这样KEY的过期时间不同,不会大量KEY同时过期
  • 利用Redis集群提高服务的可用性,避免缓存服务宕机
  • 给缓存业务添加降级限流策略(服务降级、快速失败等)
  • 给业务添加多级缓存,比如先查询本地缓存,本地缓存未命中再查询Redis,Redis未命中再查询数据库。

缓存击穿#

缓存击穿问题也叫热点Key问题,就是⼀个被 ⾼并发访问 并且 缓存重建业务较复杂 的key突然失效了,⽆数的请求访问会在瞬间给数据库带来巨⼤的冲击。

解决方案:

  • 互斥锁:给重建缓存逻辑加锁,避免多线程同时进行

当线程1发现缓存过期并尝试重建缓存时,首先获取互斥锁,再查询数据库并写入缓存,之后释放锁。在重建过程中,有其他线程也发现缓存过期并尝试重建时,会获取互斥锁失败,休眠一会再尝试查询缓存和获取锁的操作,直到查询到新的缓存数据时直接返回。

  • 逻辑过期:热点key不要设置过期时间,通过逻辑过期字段标识是否过期。

当一个线程发现缓存已经过期时,获取互斥锁进行缓存重建,与前一种方案不同的是,缓存重建时会创建新的线程去完成,重建完成后释放互斥锁,自己直接返回过期数据。在重建缓存过程中,有新线程发现缓存过期并尝试重建时,会获取锁失败,此时直接返回过期数据。

对比

解决方案优点缺点
互斥锁• 没有额外的内存消耗
• 保证一致性
• 实现简单
• 线程需要等待,性能受影响
• 可能有死锁风险
逻辑过期• 线程无需等待,性能较好• 不保证一致性
• 有额外内存消耗
• 实现复杂
  • 互斥锁不需要保存逻辑过期时间,没有额外的内存消耗,而逻辑过期需要额外维护一个逻辑过期时间,有额外的内存消耗。

互斥锁能够保证数据的强一致性,但由于锁的存在会降低并发性能;逻辑过期的方式优先保障高可用,性能好,但存在数据不一致情况。根据项目的实际需要选择合适的解决方案


利用互斥锁解决店铺详情查询的缓存击穿问题

需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题

我们可以利用setnx来实现互斥锁,setnx添加成功则返回1,添加失败则返回0;释放锁可以利用 del xxx来释放锁,当然我们使用setnx要注意给key设置过期时间,避免程序出现问题,导致锁永远无法释放,一般设置个10s足够了,业务重建缓存时间顶天1s不到;

实现思路:

进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取(构建缓存)

  1. 若获取锁成功,则再次检测redis缓存是否存在,做DoubleCheck,如果存在则无需重建缓存,如果不存在则查询数据库重建缓存。
  • 对于第一次获取就得到互斥锁的线程而言,再次检测redis缓存,结果还是不存在,然后重建缓存。
  • 对于上次获得锁失败的线程而言,本次获取锁成功,说明已经有线程完成缓存重建,再次查询缓存即可获得数据,不用再执行重建缓存操作。
  1. 若没有获取到互斥锁,则自旋等待一段时间后再次尝试获取锁,获取成功则回到 第1步;
@Override
public Result queryShopById(Long id) {
String shopJson = (String) redisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSON.parseObject(shopJson, Shop.class);
return Result.ok(shop);
}
// 命中的是否是空值
if (shopJson != null) {
return Result.fail("店铺不存在");
}
// 实现缓存重建
// 获取互斥锁
String lockKey = "lock:shop:" + id;
boolean isLocked = tryLock(lockKey);
// 是否获取成功
if (!isLocked) {
// 失败-休眠重试
ThreadUtil.sleep(50);
return queryShopById(id); // 递归重试
}
// Double-Check:第二次检查缓存
shopJson = (String) redisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSON.parseObject(shopJson, Shop.class);
return Result.ok(shop);
}
if (shopJson != null) {
return Result.fail("店铺不存在");
}
// 缓存仍未命中,查数据库
Shop shop = getById(id);
if (ObjectUtil.isEmpty(shop)) {
// 缓存空值到redis,防止穿透
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, "", 2L, TimeUnit.MINUTES);
return Result.fail("店铺不存在");
}
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSON.toJSONString(shop), 30L, TimeUnit.MINUTES);
// 释放互斥锁
unLock(lockKey);
return Result.ok(shop);
}
private boolean tryLock(String key) {
// 只有当 key 不存在时,才会设置 value,并返回 true; 如果 key 已经存在,则不会进行任何修改,并返回 false。
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", 10L, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag); // isTrue:只有当flag是true才是true,flag为false和null都返回false(避免拆箱操作报空指针)
}
private void unLock(String key) {
redisTemplate.delete(key);
}
TIP

为什么需要 double check?

当多个线程同时进入查询方法时,可能会发生:

  1. A 线程发现缓存没有,去尝试加锁。

  2. B 线程也发现缓存没有,但 A 拿到锁,B 没拿到锁。

  3. B 休眠后继续重试,但此时 A 已经把最新数据写入缓存了。

  4. 如果没有 double check,B 又会继续走完整流程 —— 白查数据库,造成竞争。

所以加锁之后必须再查一次缓存(第二次检查),避免重复构建缓存,减少数据库压力。

这样可以保证:

  1. 如果别人已经重建缓存,我们不用再查数据库

  2. 避免多线程重复重建缓存

  3. 性能更优,更安全


基于逻辑过期方式解决缓存击穿问题

对于热点key会提前添加到Redis缓存,不设置过期时间,而是设置逻辑过期时间,再次进行查询缓存时,一定会命中的。若未命中,则说明不是缓存预热的数据,直接返回空值即可。

整体思路:用户开始查询商铺,查询Redis判断是否命中,未命中直接返回空值即可。若命中,再通过逻辑过期时间判断数据是否过期,若未过期,直接返回Redis中的数据;若过期,则尝试获取互斥锁进行缓存重建。

  • 若互斥锁获取成功,再次检查Redis中的数据是否过期,做DoubleCheck。若未过期(已经有人重建完成),返回本次查询到的商铺数据;若仍过期(还没有人重建缓存),开启独立线程查询数据库进行缓存重建,自己返回过期数据。

  • 若获取互斥锁失败,返回过期数据即可。

现在redis中存储的数据需要带上过期时间,故重建一个实体类,避免对原来代码的修改

@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

做缓存预热

@SpringBootTest
class HmDianPingApplicationTests {
@Autowired
private ShopServiceImpl shopService;
@Test
void saveToRedis() {
shopService.saveShopToRedis(1L, 10L);
}
}
public void saveShopToRedis(Long id, Long expireSeconds) {
// 查询店铺数据
Shop shop = getById(id);
// 封装逻辑过期
RedisData redisData = RedisData.builder()
.data(shop)
.expireTime(LocalDateTime.now().plusSeconds(expireSeconds))
.build();
// 写入redis
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}

实现

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
@Override
public Result queryShopById(Long id) {
String shopJson = (String) redisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
if (StrUtil.isBlank(shopJson)) {
return Result.fail("不存在");
}
// 命中需要判断过期时间
RedisData redisData = JSON.parseObject(shopJson, RedisData.class);
Shop shop = JSON.parseObject(JSON.toJSONString(redisData.getData()), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
if (expireTime.isAfter(LocalDateTime.now())) {
// 未过期 返回店铺信息
return Result.ok(shop);
}
// 过期-重建缓存
// 获取互斥锁
String lockKey = "lock:shop:" + id;
boolean isLocked = tryLock(lockKey);
// 是否获取成功
if (isLocked) {
// 再次检测redis缓存是否过期,做DoubleCheck
String doubleCheckCacheStr = (String) redisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
RedisData parsed = JSON.parseObject(doubleCheckCacheStr, RedisData.class);
Shop shopObj = JSON.parseObject(JSON.toJSONString(parsed.getData()), Shop.class);
LocalDateTime newExpireTime = parsed.getExpireTime();
// 缓存未过期(已经有线程重建完成了),则返回数据
if (newExpireTime.isAfter(LocalDateTime.now())) {
return Result.ok(shopObj);
}
// 缓存仍过期 (还没有其他的线程重建缓存),创建独立线程,重建缓存
CACHE_REBUILD_EXECUTOR.submit(() -> {
saveShopToRedis(id, 20000L);
// 释放锁
unLock(lockKey);
});
}
// 返回过期的商铺信息
return Result.ok(shop);
}
private boolean tryLock(String key) {
// 只有当 key 不存在时,才会设置 value,并返回 true; 如果 key 已经存在,则不会进行任何修改,并返回 false。
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", 10L, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag); // isTrue:只有当flag是true才是true,flag为false和null都返回false(避免拆箱操作报空指针)
}
private void unLock(String key) {
redisTemplate.delete(key);
}
public void saveShopToRedis(Long id, Long expireSeconds) {
// 查询店铺数据
Shop shop = getById(id);
// 封装逻辑过期
RedisData redisData = RedisData.builder()
.data(shop)
.expireTime(LocalDateTime.now().plusSeconds(expireSeconds))
.build();
// 写入redis
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}

缓存工具封装#

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间

  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题

  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题

  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

  • 方法5:根据指定的key查询缓存,并反序列化为指定类型,需要利用互斥锁解决缓存击穿问题

package com.hmdp.utils;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* @ClassName CacheClient
* @Description 缓存工具类
* @Author zzy
*/
@Component
public class CacheClient {
//存入Reids中的空值的过期时间
public static final Long CACHE_NULL_TTL = 2L;
//存入Reids中的空值的过期时间的时间类型
public static final TimeUnit CACHE_NULL_TIME_UNIT = TimeUnit.MINUTES;
//互斥锁对应的key
public static final String LOCK_KEY = "lock:";
//获取互斥锁失败后的等待时间(单位毫秒)
public static final Long SPIN_WAIT_MILLISECOND = 50L;
//使用构造函数注入StringRedisTemplate
private final StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
//创建拥有十个线程的线程池,用来重建缓存,避免经常创建销毁线程
private final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
*
* @param key String类型的Key
* @param value 任意类型的对象
* @param time 过期时间
* @param timeUnit 时间单位
*/
public void set(String key, Object value, Long time, TimeUnit timeUnit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, timeUnit);
}
/**
* 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
*
* @param key String类型的Key
* @param value 任意类型的对象
* @param time 逻辑过期时间
* @param timeUnit 时间单位
*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit timeUnit) {
//设置逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
//写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
/**
* 据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
*
* @param keyPrefix key的前缀
* @param id id
* @param type 需要返回的对象的Class类型
* @param dbFallback 根据id进行数据库查询的函数
* @param time 过期时间
* @param timeUnit 时间单位
* @param <R> 需要返回的对象类型的泛型
* @param <ID> id的泛型
* @return
*/
public <R, ID> R queryWithPassThrough(String keyPrefix,
ID id,
Class<R> type,
Function<ID, R> dbFallback,
Long time,
TimeUnit timeUnit) {
//1、从redis中根据id查询商铺
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
//2、判断是否存在记录
if (StrUtil.isNotBlank(json)) {
//存在,返回数据
R r = JSONUtil.toBean(json, type);
return r;
}
//3、判断记录是否为空值
if (json != null) {
return null;
}
//4、查询数据库
R r = dbFallback.apply(id);
//5、数据库是否存在记录
if (r == null) {
//6、不存在,将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, CACHE_NULL_TIME_UNIT);
return null;
}
//7、存在,保存数据到redis,返回数据
this.set(key, r, time, timeUnit);
return r;
}
/**
* 根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
*
* @param keyPrefix key的前缀
* @param id id
* @param type 需要返回对象的Class类型
* @param dbFallback 根据id查询数据库
* @param time 逻辑过期时间
* @param timeUnit 时间单位
* @param <R> 需要返回的对象类型的泛型
* @param <ID> id的泛型
* @return
*/
public <R, ID> R queryWithLogicalExpire(String keyPrefix,
ID id,
Class<R> type,
Function<ID, R> dbFallback,
Long time,
TimeUnit timeUnit) {
//1、从redis中根据id查询商铺
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
//2、缓存未命中,返回空数据
if (StrUtil.isBlank(json)) {
return null;
}
//3、缓存命中
RedisData cacheData = JSONUtil.toBean(json, RedisData.class);
LocalDateTime expireTime = cacheData.getExpireTime();
R r = JSONUtil.toBean((JSONObject) cacheData.getData(), type);
//3.1、判断缓存是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
//3.2、缓存未过期,直接返回数据
return r;
}
//4、缓存过期,需要进行缓存重建
//4.1、尝试获取互斥锁
String lockKey = LOCK_KEY + id;
boolean isLock = tryLock(lockKey);
//4.2、互斥锁获取成功
if (isLock) {
//4.3、再次检测redis缓存是否过期,做DoubleCheck
String doubleCheckCacheStr = stringRedisTemplate.opsForValue().get(key);
RedisData redisData = JSONUtil.toBean(doubleCheckCacheStr, RedisData.class);
LocalDateTime newExpireTime = redisData.getExpireTime();
R newR = JSONUtil.toBean((JSONObject) redisData.getData(), type);
//4.3、缓存未过期(已经有线程重建完成了),则返回数据
if (newExpireTime.isAfter(LocalDateTime.now())) {
return newR;
}
//4.4 缓存仍过期 (还没有其他的线程重建缓存),创建独立线程,重建缓存
//将重建工作交给线程池完成
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
//查询数据库
R dbR = dbFallback.apply(id);
//重建缓存
this.setWithLogicalExpire(key, dbR, time, timeUnit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//4.5释放锁
unlock(lockKey);
}
});
}
//5、返回过期的商铺信息
return r;
}
/**
* 根据指定的key查询缓存,并反序列化为指定类型,利用互斥锁解决缓存击穿问题
*
* @param keyPrefix key的前缀
* @param id id
* @param type 需要返回对象的Class类型
* @param dbFallback 根据id查询数据库
* @param time 过期时间
* @param timeUnit 时间单位
* @param <R> 需要返回的对象类型的泛型
* @param <ID> id的泛型
* @return
*/
public <R, ID> R queryWithMutex(String keyPrefix,
ID id,
Class<R> type,
Function<ID, R> dbFallback,
Long time,
TimeUnit timeUnit) {
//1、从redis中根据id查询商铺
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
//2、判断是否存在记录
if (StrUtil.isNotBlank(json)) {
//存在,返回数据
R r = JSONUtil.toBean(json, type);
return r;
}
//3、判断记录是否为空值
if (json != null) {
return null;
}
//4、redis 查询结果为null缓存失效,尝试重建缓存
String lockKey = LOCK_KEY + id;
R dbR = null;
try {
//自旋等待,尝试获取互斥锁
while (!tryLock(lockKey)) {
Thread.sleep(SPIN_WAIT_MILLISECOND);
}
//4.2、获取锁成功,再次查询缓存
String newJson = stringRedisTemplate.opsForValue().get(key);
//缓存有效,直接返回
if (StrUtil.isNotBlank(newJson)) {
//存在,返回数据
return JSONUtil.toBean(newJson, type);
}
//4.3、缓存无效,查询数据库重建缓存
dbR = dbFallback.apply(id);
//数据库是否存在记录
if (dbR == null) {
//不存在,将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, CACHE_NULL_TIME_UNIT);
return null;
}
//存在,保存数据到redis,返回数据
this.set(key, dbR, time, timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//5、释放锁
unlock(lockKey);
}
//返回数据
return dbR;
}
/**
* 获取互斥锁,利用 setnx设置互斥锁,并设置锁的过期时间
*
* @param key
* @return
*/
public boolean tryLock(String key) {
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(isLock);
}
/**
* 释放互斥锁
*
* @param key
*/
public void unlock(String key) {
stringRedisTemplate.delete(key);
}
}

使用工具类优化解决之前店铺详情查询的缓存击穿和缓存穿透问题的代码

public Result queryShopById(Long id) {
// 通过缓存空值,缓存穿透
// Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY,
// id, Shop.class, this::getById, CACHE_SHOP_TTL,TimeUnit.MINUTES);
// 互斥锁解决缓存击穿
// Shop shop = cacheClient.queryWithMutex(CACHE_SHOP_KEY,id,Shop.class,
// this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);
// 逻辑过期解决缓存击穿
Shop shop = cacheClient.queryWithLogicalExpire(CACHE_SHOP_KEY,id,Shop.class,
this::getById,10L,TimeUnit.SECONDS);
if (shop == null) {
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}

优惠券秒杀#

全局唯—ID#

对于使用MySQL数据库的ID存在的问题:

  • id的规律性太明显,可能泄露数据
  • 受单表数据量的限制,数量过大后需要进行分库分表,会出现ID重复的情况,无法保证同类数据ID的唯一性。

全局ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足一下特性:

  • 唯一性
  • 高性能
  • 高可用
  • 安全性
  • 递增性

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

  • 符号位:1bit,为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

工具类

@Component
@RequiredArgsConstructor
public class RedisIdWorker {
private final RedisTemplate<String, Object> redisTemplate;
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列号位数
*/
private static final int COUNT_BITS = 32;
/**
* 生成全局唯一ID
*
* @param keyPerfix 相关业务的ID前缀
* @return
*/
public long nextId(String keyPerfix) {
// 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
Long count = redisTemplate.opsForValue().increment("icr:" + keyPerfix + ":" + date); // redis自增长的值是有上限的,是2^64
return timestamp << COUNT_BITS | count;
}
}

测试

创建含有300个线程的线程池,每个线程使用Reids的id生成器生成100个id,统计用时。

@Resource
private RedisIdWorker idWorker;
private ExecutorService es = Executors.newFixedThreadPool(300);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = idWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}

耗时1秒多,与机器性能有关

实现优惠券秒杀下单#

秒杀券下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

具体流程如下:

下单功能

@Service
@RequiredArgsConstructor
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private final ISeckillVoucherService seckillVoucherService;
private final RedisIdWorker redisIdWorker;
@Override
@Transactional(timeout = 5000, rollbackFor = Exception.class)
public Result seckillVoucher(Long voucherId) {
// 查优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = voucher.getBeginTime();
if (beginTime.isAfter(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
boolean flag = seckillVoucherService.update().setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.update();
if (!flag) {
return Result.fail("扣减失败");
}
// 创建订单
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = VoucherOrder.builder()
.id(orderId)
.userId(UserHolder.getUser().getId())
.voucherId(voucherId)
.build();
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
}

超卖问题#

理想情况:

错误分析:

​ 假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

  • 版本号法

给数据添加一个版本version字段,当数据修改时version加一,基于version字段判断有没有被修改过。每一次更新都必须满足更新前的 version == 当前数据库中的 version

  • CAS法

数据本身是否发生变化判断线程是否安全(比如库存数量)

乐观锁解决超卖

private final ISeckillVoucherService seckillVoucherService;
private final RedisIdWorker redisIdWorker;
@Override
@Transactional(timeout = 5000, rollbackFor = Exception.class)
public Result seckillVoucher(Long voucherId) {
// 查优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = voucher.getBeginTime();
if (beginTime.isAfter(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
// 扣减库存
boolean flag = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();
if (!flag) {
return Result.fail("扣减失败");
}
// 创建订单
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = VoucherOrder.builder()
.id(orderId)
.userId(UserHolder.getUser().getId())
.voucherId(voucherId)
.build();
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}

超卖这样的线程安全问题,解决方案有哪些?

  1. 悲观锁:添加同步锁,让线程串行执行

    • 优点:简单粗暴
    • 缺点:性能一般
  2. 乐观锁:不加锁,在更新时判断是否有其它线程在修改

    • 优点:性能好
    • 缺点:存在成功率低的问题 (where id = ? and stock > 0)

一人一单#

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

具体逻辑如下:首先查询优惠券,判断当前时间是否处于秒杀阶段,再进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单。

在JMeter多线程环境下测试,同样会出现线程安全的问题,多个线程第一次下单时,同时查询到当前不存在订单,然后各自去下单,还是会出现一个用户下了多个订单的情况。

这里没法使用乐观锁,因为数据根本就不存在,没法判断是否被修改过,我们要判断的是是否存在,只能用悲观锁方案;

加锁的粒度

首先把从验证一人一单 到 添加优惠券订单的逻辑抽取到一个方法createVoucherOrder中,在这个方法中进行查询订单、扣减库存并完成订单添加。

@Override
public Result seckillVoucher(Long voucherId) {
// 查优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = voucher.getBeginTime();
if (beginTime.isAfter(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
return createVoucherOrder(voucherId, userId);
}
@Transactional(timeout = 5000, rollbackFor = Exception.class)
public synchronized Result createVoucherOrder(Long voucherId, Long userId) { // 锁的对象是this
// 一人一单
Integer count = lambdaQuery()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId)
.count();
if (count > 0) {
// 用户至少下过一单
return Result.fail("已经购买过了");
}
// 扣减库存
boolean flag = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();
if (!flag) {
return Result.fail("库存不足");
}
// 创建订单
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = VoucherOrder.builder()
.id(orderId)
.userId(userId)
.voucherId(voucherId)
.build();
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}

如果在这个方法上加锁,在同一时刻只有一个线程可以执行该方法,每个线程对这个方法的访问变成了串行方式,性能降低。 加锁的初衷是为了解决同一个用户的线程安全问题,而不同用户应该互不受影响。因此需要降低锁的粒度,同一个用户加一把锁,不同用户加不同的锁,故可以对用户的id加锁。 (同一个用户才需要去判断并发安全问题,如果不是同一个用户不需要加锁)

为什么要用 synchronized (userId.toString().intern())

目的:让同一个 userId 的操作串行执行,不同 userId 并行执行 (单机环境下,以 userId 为粒度的本地同步锁,使同一个 userId 的操作串行化)

  • 同一个用户请求,只允许一个线程进入同步块
  • 不同用户之间互不影响,可以同时执行。
@Override
public Result seckillVoucher(Long voucherId) {
// 查优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = voucher.getBeginTime();
if (beginTime.isAfter(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
return createVoucherOrder(voucherId);
}
@Transactional(timeout = 5000, rollbackFor = Exception.class)
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 一人一单
Integer count = lambdaQuery()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId)
.count();
if (count > 0) {
// 用户至少下过一单
return Result.fail("已经购买过了");
}
// 扣减库存
boolean flag = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();
if (!flag) {
return Result.fail("库存不足");
}
// 创建订单
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = VoucherOrder.builder()
.id(orderId)
.userId(userId)
.voucherId(voucherId)
.build();
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
}

如果不加 intern(),会出什么问题?

synchronized (userId.toString())

虽然两次生成的字符串内容相等,但不是同一个对象,因为是 new 出来的不同实例,这样锁根本不生效

intern() 返回的是字符串常量池的同一个对象,JVM 会检查 字符串常量池(String Pool)里是否已经存在一个与 “xxx” 内容相同的字符串,如果存在:返回池中该字符串的引用,如果不存在:将 “xxx” 的字符串内容放入池中,并返回池中的引用

现在的逻辑是,先释放锁,再提交事务,这个事务是被Spring管理的,锁释放后意味着其他线程可以进来了,如果此时事物尚未提交,如果有线程进来查询订单了,而上一个订单还未写入数据库,此时查询订单就会发现不存在,然后再去创建订单,这就导致了订单重复的问题,依然存在并发安全问题;

所以我们应该是事物提交后再去释放锁,所以我们应该把锁放在这里;

@Service
@RequiredArgsConstructor
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private final ISeckillVoucherService seckillVoucherService;
private final RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
// 查优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = voucher.getBeginTime();
if (beginTime.isAfter(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional(timeout = 5000, rollbackFor = Exception.class)
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 一人一单
Integer count = lambdaQuery()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId)
.count();
if (count > 0) {
// 用户至少下过一单
return Result.fail("已经购买过了");
}
// 扣减库存
boolean flag = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();
if (!flag) {
return Result.fail("库存不足");
}
// 创建订单
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = VoucherOrder.builder()
.id(orderId)
.userId(userId)
.voucherId(voucherId)
.build();
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
}
TIP

如何让事务生效?

添加依赖

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

启动类打上注解

@EnableAspectJAutoProxy(exposeProxy = true)

接口加上

Result createVoucherOrder(Long voucherId);
synchronized (userId.toString().intern()) {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}

一人一单的并发安全问题 (分布式失效)

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

1.我们将服务启动两份,端口分别为8081和8082;

  1. 然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡;
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/json;
sendfile on;
keepalive_timeout 65;
server {
listen 8080;
server_name localhost;
# 指定前端项目所在的位置
location / {
root html/hmdp;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location /api {
default_type application/json;
#internal;
keepalive_timeout 30s;
keepalive_requests 1000;
#支持keep-alive
proxy_http_version 1.1;
rewrite /api(/.*) $1 break;
proxy_pass_request_headers on;
#more_clear_input_headers Accept-Encoding;
proxy_next_upstream error timeout;
#proxy_pass http://127.0.0.1:8081;
proxy_pass http://backend;
}
}
upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}
}
Terminal window
# win
nginx.exe -s reload

现在,用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题。

启动两端口的服务,使用同一个用户下两次单,在锁内部打上断点,debug结果如下:同一个用户在不同端口的服务上都成功获取到了锁,都可以进行下单操作。

正常理想情况:

集群问题:

在集群模式下或分布式系统,有多个JVM的存在,每个JVM内部都有自己的锁,导致每一个锁都有一个线程获取,就出现了并行运行;

分布式锁#

分布式锁的实现方案#

什么是分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

  • 可见性:多个jvm都能看到相同的结果。
  • 互斥:不管谁来访问,只能有一个人能拿到。
  • 高可用:大多数情况来获取锁都是成功的,不能说获取锁的时候经常出现问题。
  • 高性能(高并发):由于加锁本身就让性能降低,如果获取锁的动作又很慢,那么就太慢了,所以获取锁的动作必须高性能。
  • 安全性:考虑异常的情况,比如获取锁成功了,但是还没有释放锁,程序异常了怎么办(可能死锁)。

分布式锁的实现方案

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

  • mysql: 利用事务机制获取锁,如果有异常,事务就会回滚,则释放了锁;
  • redis: 利用redis的setnx命令获取锁,如果setnx成功,则获取锁,否则获取锁失败;释放锁即使把这个key删掉,其他人就能继续获取锁了,为了在服务出现故障后仍能自动释放锁,需要在添加key的时候设置过期时间(需要考虑看门狗);可用性也是很好既支持主从也支持集群;
特性MySQLRedisZookeeper
互斥利用mysql本身的互斥锁机制利用setnx这样的互斥命令利用节点的唯一性和有序性实现互斥
高可用
高性能一般一般
安全性断开连接,自动释放锁利用锁超时时间,到期释放临时节点,断开连接自动释放

利用Redis实现分布式锁#

实现分布式锁时需要实现的两个基本方法:

  • 获取锁

    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
      Terminal window
      # 添加锁,利用setnx的互斥特性
      SETNX lock thread1
      # 添加锁过期时间,避免服务岩机引起的死锁(过期时间要比业务时间长,否则业务没执行完锁就自动释放了)
      EXPIRE lock 10
      # 并且我们需要保证setnx和expire是原子操作,要么都成功要么都失败;
      # 不能分两步执行(先 SETNX 再 EXPIRE),因为在这两个命令之间如果发生进程崩溃、网络中断等故障,会导致锁没有设置过期时间,从而引发死锁。
      # 命令格式:
      SET key value NX EX seconds
      # 比如:
      SET lock:shop101 thread-001 NX EX 30
      # NX → 仅当 key 不存在时才设置(相当于 SETNX)
      # EX seconds → 设置 key 的过期时间(秒)
  • 释放锁

    • 手动释放

    • 超时释放:获取锁时添加一个超时时间

      Terminal window
      # 释放锁,删除即可
      DEL key

流程:


基于Redis实现分布式锁初级版本

需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。

public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功;false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}

实现:

public class SimpleRedisLock implements ILock {
private RedisTemplate<String, Object> redisTemplate;
private String name;
private static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(RedisTemplate<String, Object> redisTemplate, String name) {
this.redisTemplate = redisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
long threadId = Thread.currentThread().getId();
Boolean flag = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
@Override
public void unlock() {
redisTemplate.delete(KEY_PREFIX + name);
}
}

修改业务逻辑

@Override
public Result seckillVoucher(Long voucherId) {
// 查优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = voucher.getBeginTime();
if (beginTime.isAfter(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock simpleRedisLock = new SimpleRedisLock(redisTemplate, "order:" + userId);
boolean flag = simpleRedisLock.tryLock(1000);
if (!flag) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
simpleRedisLock.unlock();
}
}

Redis分布式锁误删问题

问题说明:

  • 持有锁的线程1在锁的内部出现了阻塞,导致他的锁TTL到期,自动释放
  • 此时线程2也来尝试获取锁,由于线程1已经释放了锁,所以线程2可以拿到
  • 但是现在线程1阻塞完了,继续往下执行,要开始释放锁了
  • 那么此时就会将属于线程2的锁释放,这就是误删别人锁的情况

有一些问题:

  • 业务阻塞,导致锁提前释放
  • 释放锁时,把别人的锁释放了(锁误删)

释放锁时应该做一个判断,解决方案就是在每个线程释放锁的时候,都判断一下这个锁是不是自己的,如果不属于自己,则不进行删除操作。

所以正确流程应该是:

解决Redis分布式锁误删问题

  1. 在获取锁时存入线程标示(可以用UUID表示)

  2. 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

    • 如果一致则释放锁
    • 如果不一致则不释放锁

完善后:

key:KEY_PREFIX + name,key的前缀KEY_PREFIX = “lock:“,再根据当前的业务名称传入name,二者拼接起来作为key,让不同的业务获取不同的锁。

value:应设置成当前线程的唯一标识,防止因线程阻塞导致锁自动释放后再去执行释放锁操作,将别的线程锁设置的锁误删。此处的唯一标识设置为UUID + 线程id的形式,当要主动释放锁时,先获取对应的value值,判断与自己的唯一标识是否相同,相同则删除该key释放锁,否则不用处理。

当前线程的id不能作为线程的唯一标识。每个JVM实例都会为在其内部创建的每个线程分配一个唯一的标识符(通常是一个递增的长整型数字)。当有多个JVM实例运行时,每个JVM实例的线程标识符空间是独立的,所以不同JVM实例中的线程可能会有相同的标识符。

public class SimpleRedisLock implements ILock {
private RedisTemplate<String, Object> redisTemplate;
private String name;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public SimpleRedisLock(RedisTemplate<String, Object> redisTemplate, String name) {
this.redisTemplate = redisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean flag = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
@Override
public void unlock() {
// 线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
String redisThreadId = (String) redisTemplate.opsForValue().get("KEY_PREFIX + name");
// 标识是否一致
if (threadId.equals(redisThreadId)) {
redisTemplate.delete(KEY_PREFIX + name);
}
}
}

分布式锁的原子性问题

由于判断标识是否一致和释放锁是两个操作,可能会在判断标识一致后发生线程阻塞并且阻塞时间过长导致锁超时释放,其他线程就会获取锁成功。而当阻塞完成时的线程会直接去释放锁(之前已经判断过是一致的),此时就会释放其他线程的锁(又发生了误删),从而可能引发线程安全问题。因此需要一种机制保证这两个Redis操作的原子性 --- Redis的Lua脚本

JVM的垃圾回收机制在FULL GC时会暂时阻塞所有线程‌

阻塞导致误删情况:

逻辑解释:

  • 假设线程1已经获取了锁,在判断标识一致之后,准备释放锁的时候,又出现了阻塞(例如JVM垃圾回收机制)
  • 于是锁的TTL到期了,自动释放了
  • 那么现在线程2趁虚而入,拿到了一把锁
  • 但是线程1的逻辑还没执行完,那么线程1就会执行删除锁的逻辑
  • 但是在阻塞前线程1已经判断了标识一致,所以现在线程1把线程2的锁给删了
  • 那么就相当于判断标识那行代码没有起到作用
  • 这就是删锁时的原子性问题

Redis调用Lua脚本#

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编 程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

Redis提供的调用函数,语法如下:

# 执行 Redis 命令
# 执行redis命令
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行 set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行 set name Rose,再执行 get name,则脚本如下:

# 先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

Terminal window
127.0.0.1:6379> help @scripting
EVAL script numkeys key [key ...] arg [arg ...]
summary: Execute a Lua script server side
since: 2.6.0

例如,我们要执行redis.call(‘set’,‘name’,jack’)这个脚本,语法如下:

Terminal window
127.0.0.1:6379> EVAL "return redis.call('set','name','Jack')" 0
OK
127.0.0.1:6379> get name
"Jack"

如果脚本中的key、Value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在 脚本中可以从KEYS和ARGV数组获取这些参数:

127.0.0.1:6379> EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name Lucy
OK
127.0.0.1:6379> get name
"Lucy"
127.0.0.1:6379>
WARNING

lua语言中数组角标从1开始

释放锁的业务流程是这样的:

  1. 获取锁中的线程标示
  2. 判断是否与指定的标示(当前线程标示)一致
  3. 如果一致则释放锁(删除)
  4. 如果不一致则什么都不做

如果用Lua脚本来表示则是这样的:

-- 获取锁中的线程标示
local id = redis.call('GET', KEYS[1])
-- 比较线程标示与锁中的标示是否一致
if(id == ARGV[1]) then
return redis.call('DEL', KEYS[1])
end
return 0

Java调用Lua脚本#

RedisTemplate调用Lua脚本的API如下:

其中,将KEY类型参数放在了一个List中,通过该List可以知道传入的KEY类型参数的个数,就不用原指令中的numkeys参数了。

使用步骤:

先在classpath路径下建一个lua脚本

-- 获取锁中的线程标示
local id = redis.call('GET', KEYS[1])
-- 比较线程标示与锁中的标示是否一致
if(id == ARGV[1]) then
return redis.call('DEL', KEYS[1])
end
return 0

使用redisTemplate或stringRedisTemplate调用lua脚本:

package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
private RedisTemplate<String, Object> redisTemplate;
private String name;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); // 指定classpath文件路径
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(RedisTemplate<String, Object> redisTemplate, String name) {
this.redisTemplate = redisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean flag = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
@Override
public void unlock() {
// 调用lua脚本
redisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
// @Override
// public void unlock() {
// // 线程标识
// String threadId = ID_PREFIX + Thread.currentThread().getId();
// String redisThreadId = (String) redisTemplate.opsForValue().get("KEY_PREFIX + name");
// // 标识是否一致
// if (threadId.equals(redisThreadId)) {
// redisTemplate.delete(KEY_PREFIX + name);
// }
// }
}
public interface ILock {
/**
*尝试获取锁
* @param timeout 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功;false代表获取锁失败
*/
boolean tryLock(long timeout);
/**
* 释放锁
*/
void unlock();
}

解决了阻塞情况下锁误删情况;

基于Redis的分布式锁实现思路:

  • 利用 set nx ex 获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用 set nx 满足互斥性
  • 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用 Redis 集群保证高可用和高并发特性

Redisson#

基于setnx实现的分布式锁存在下面的问题:

  • 不可重入 同一个线程无法多次获取同一把锁

可重入锁是指同一个线程可以多次获取同一把锁

比如,我有一个方法a,需要先获取锁,然后执行业务并调用方法b,而方法b里也要获取同一把锁。如果锁是不可重入的,那么在方法a中获取锁后,调用方法b时再次尝试获取这把锁就会失败,此时会等待锁的释放。但由于方法a还未执行完,仍在调用b,所以会出现死锁。因此,在这种场景下,要求锁必须是可重入的。

  • 不可重试 获取锁只尝试一次就返回 false,没有重试机制

我们之前所实现的锁是非阻塞的,如果失败,则立即返回

  • 超时释放 锁超时释放虽然可以避免死 锁,但如果是业务执行耗时 较长,也会导致锁释放,存 在安全隐患

如果时间太短,业务还没执行完,锁就释放了,就有可能发生我的业务还在执行,其他线程就获取到锁了;如果设置时间太长,将来如果我这个业务出现故障,那么很长一段时间其他线程都不能获取到锁,都在等待锁超时释放;

  • 主从一致性 如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的 锁数据,则会出现锁实现

在某些情况下,比如现在有一个线程在主节点获取了锁。由于加锁是 set 操作,是一个写操作,这个写操作在主节点完成后,如果还没有同步到从节点,因为存在延迟,主节点突然宕机了。此时会选一个新的从节点作为主节点,而这个从节点因为没有完成同步,所以没有锁的标识。这样其他线程可能会趁机获取到锁,导致出现多个线程持有锁的情况。在极端情况下可能会出现安全问题。当然,这种情况发生的概率很低,因为主从同步的延迟通常非常低,往往在毫秒级别甚至更低。

因此,前面提到的这4个问题,要么发生概率极低,要么并不是所有业务都有这样的需求,有些业务需要,有些业务不需要。


Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式 的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

是在Redis基础上实现的分布式工具的集合,在分布式系统下用的工具,它都有。(分布式锁就是它的一个子集)

Redis提供了分布式锁的多种多样功能

  1. 可重入锁(Reentrant Lock)
  2. 公平锁(Fair Lock)
  3. 联锁(MultiLock)
  4. 红锁(RedLock)
  5. 读写锁(ReadWriteLock)
  6. 信号量(Semaphore)
  7. 可过期性信号量(PermitExpirableSemaphore)
  8. 闭锁(CountDownLatch)

官网地址:https://redisson.org

GitHub地址:https://github.com/redisson/redisson

Redisson快速入门#

引入依赖

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

配置Redisson客户端

@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();
// 添加 redis 地址,这里添加了单点的地址,也可以使用 config.useClusterServers() 添加集群地址
config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");
// 创建客户端
return Redisson.create(config);
}
}

使用:

@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if(isLock){
try {
System.out.println("执行业务");
}finally {
// 释放锁
lock.unlock();
}
}
}

业务改进:

private final RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 查优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = voucher.getBeginTime();
if (beginTime.isAfter(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 未开始
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
// 创建锁对象
// SimpleRedisLock simpleRedisLock = new SimpleRedisLock(redisTemplate, "order:" + userId);
RLock lock = redissonClient.getLock("order:" + userId);
boolean flag = lock.tryLock(); // 立即尝试获取锁,获取不到直接返回 false,且不重试
if (!flag) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
// simpleRedisLock.unlock();
}
}
@Transactional(timeout = 5000, rollbackFor = Exception.class)
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 一人一单
Integer count = lambdaQuery()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId)
.count();
if (count > 0) {
// 用户至少下过一单
return Result.fail("已经购买过了");
}
// 扣减库存
boolean flag = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId)
.gt("stock", 0) // where id = ? and stock > 0
.update();
if (!flag) {
return Result.fail("库存不足");
}
// 创建订单
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = VoucherOrder.builder()
.id(orderId)
.userId(userId)
.voucherId(voucherId)
.build();
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}

Redisson可重入锁原理#

// 创建锁对象
RLock lock = redissonClient.getLock("lock");
@Test
void method1() {
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败, 1");
return;
}
try {
log.info("获取锁成功, 1");
method2();
} finally {
log.info("释放锁, 1");
lock.unlock();
}
}
void method2(){
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败, 2");
return;
}
try {
log.info("获取锁成功, 2");
} finally {
log.info("释放锁, 2");
lock.unlock();
}
}
  1. method1() 成功加锁(第一次 tryLock() 返回 true)。
  2. method1() 调用 method2(),此时当前线程已持有锁。
  3. method2() 再次调用 lock.tryLock():
    • 由于是同一个线程,Redisson 允许重入,tryLock() 返回 true。
    • 锁的内部重入计数(hold count)变为 2。
  4. method2() 执行完,调用 lock.unlock():
    • 重入计数减为 1。
    • 锁并未真正释放(因为计数 > 0)。
  5. 回到 method1() 的 finally 块,再次 lock.unlock():
    • 重入计数减为 0,锁真正释放。

如果是不可重入的:method1在方法内部调用method2(method1和method2出于同一个线程),那么method1已经拿到一把锁了,想进入method2中拿另外一把锁,必然是拿不到的,于是就出现了死锁;

Redisson可重入锁原理

利用Hash结构记录线程ID和重入次数‌,使用计数器维护锁状态

当同一个线程第一次获取锁时,Redis会记录下这个线程的ID,并将锁的持有次数设置为1。如果这个线程再次请求锁(即可重入操作),Redisson会检测到当前持有锁的线程ID与当前线程相同,于是不会重新设置锁,而是增加计数器,表示这个线程再次持有了锁‌。

并且同一线程可以多次获取同一个锁,且只有当所有锁释放操作都完成后,锁才会真正释放。Redisson通过维护一个计数器来实现这一特性。每次释放锁时,Redisson会减少计数器,只有当计数器减为0时,锁才会真正释放‌。

步骤:

获取锁的Lua脚本

local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程的唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断锁是否存在
if (redis.call('exists',key) == 0) then
-- 锁不存在,获取锁
redis.call('hset', key, threadId, 1);
--设置有效期
redis.call('expire', key, releaseTime);
-- 返回结果
return 1;
end
--锁已存在,判断锁是否是自己的
if (redis.call('hexists',key,threadId) == 1) then
--锁是自己的,重置过期时间,计数器加一
redis.call('hincrby',key,threadId,1);
redis.call('expire',key,releaseTime);
return 1;
end
return 0; -- 锁不是自己的,获取锁失败

释放锁的Lua脚本

local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程的唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断锁是否是自己的
if (redis.call('hexists',key,threadId) == 0) then
--锁不是自己的,直接返回
return nil;
end
-- 是自己的锁,则重入次数减一
local count = redis.call('hincrby',key,threadId,-1);
-- 判断计数器是否已减为0
if (count > 0) then
-- 计数器不为0,不能释放锁,重置锁有效期后返回
redis.call('expire',key,releaseTime);
return nil;
else
-- 计数器减为零,释放锁
redis.call('del',key);
return nil;
end

Redisson的锁重试和WatchDog机制#

参数解释:

tryLock方法会接受等待时间超时时间时间单位三个参数

private final RedissonClient redissonClient;
RLock lock = redissonClient.getLock("order:" + userId);
boolean flag = lock.tryLock(); // 不等待,立即尝试获取锁,获取不到直接返回 false,且不重试
if (!flag) {
// 获取锁失败
}
// 最多等待10秒获取锁,成功后锁自动30秒过期
boolean flag = lock.tryLock(10, 30, TimeUnit.SECONDS);
// 最多等待10s,如果10s内获取不到锁就失败
flag = lock.tryLock(10, TimeUnit.SECONDS); // 如果不传leaseTime参数,默认是30秒(看门狗的默认时间),锁过期

锁重试是利用消息订阅和信号量机制,来实现锁重试的,它不是无休止的盲等机制;

在tryLock的源码中可以看到,释放锁的操作(lua脚本),在执行del语句后通过publish来发送释放锁的信号

看门狗机制

WatchDog机制,也称为看门狗机制,通过动态续约锁的过期时间,确保了分布式锁在持有者未主动释放之前不会被其他线程获取,从而有效防止了因锁超时而导致的线程安全问题、数据一致性问题。

分布式锁通常带有一个过期时间(TTL)来防止因锁未释放而导致的死锁问题。然而,业务逻辑执行时间可能超过锁的默认过期时间,如果没有扩展锁的时间,锁会自动过期并释放,导致其他线程获得锁,进而引发数据一致性问题。WatchDog机制的作用就是动态续约锁的过期时间,确保锁在持有者未主动释放之前不会被其他线程获取‌。

若获取锁时没有指定锁的自动释放时间,leaseTime参数默认为-1,在异步获取锁时会将锁的自动释放时间设置为WatchdogTimeout,默认为30s。一旦锁被获取,Redisson会启动一个WatchDog定时任务。这个定时任务每隔一段时间(通常是10秒)会检查锁的状态,如果锁仍然有效,此时这个timeTask 就触发了,它会自动将锁的持有时间再延长30秒,如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timeTask,完成不停的续约。这样,即使业务逻辑执行时间超过了锁的初始过期时间,锁也不会被自动释放‌。

当客户端完成需要锁定的操作后,会手动释放锁,并删除定时任务。如果客户端在操作过程中发生异常或崩溃,WatchDog也会在锁的持有时间结束后自动释放锁,以避免死锁的发生‌。

获取锁释放锁逻辑:

Redisson 分布式锁原理:

  • 可重入:利用 hash 结构记录线程 id 和重入次数
  • 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用 watchDog,每隔一段时间(releaseTime / 3),重置超时时间

总的来说解决了不可重入、不可重试、超时释放的问题


Redisson锁的MultiLock原理

为了提高Redis的可用性,我们会搭建集群或者主从,以主从为例,主节点负责增删改,从节点负责读,主机会将数据同步给从机,但在主从同步完成之前,如果主节点宕机,Redis的哨兵机制会选择一个新的从节点作为主节点。然而,这个新的主节点上并没有之前的锁信息,导致锁失效。这样,当新的线程发来请求时,又可以获取到锁,从而出现两个线程并发访问安全问题。

为了解决这个问题,Redisson提出了‌MultiLock‌锁(联锁)。MultiLock锁不使用主从关系,而是将每个Redis节点都视为独立的节点,都可以进行读写操作。在获取锁时,需要在所有的Redis服务器上都要获取锁,只有所有的服务器都写入成功,才算是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功。这样,即使某个节点宕机 ,由于其他节点上仍然保留有锁的标识,因此新的线程无法在所有节点上都获取到锁,从而保证了锁的一致性和安全性‌。

测试

我们先使用虚拟机额外搭建两个Redis节点

@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.137.130:6379")
.setPassword("root");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
config.useSingleServer().setAddress("redis://92.168.137.131:6380")
.setPassword("root");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
config.useSingleServer().setAddress("redis://92.168.137.132:6381")
.setPassword("root");
return Redisson.create(config);
}
}

使用联锁,我们首先要注入三个RedissonClient对象

@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
private RLock lock;
@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
lock = redissonClient.getMultiLock(lock1, lock2, lock3); // 用哪一client调用都没有问题,都是一样的;并且这里的每一个节点的锁都是可重入锁
}
@Test
void method1() {
boolean success = lock.tryLock();
redissonClient.getMultiLock();
if (!success) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}
void method2() {
RLock lock = redissonClient.getLock("lock");
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}

所谓的联锁,就是多个独立的锁,而每一个独立的锁,跟之前的原理是一样的;

总结

1)不可重入Redis分布式锁:

  • 原理:利用 setnx 的互斥性;利用 ex 避免死锁;释放锁时判断线程标示
  • 缺陷:不可重入、无法重试、锁超时失效

2)可重入的Redis分布式锁:

  • 原理:利用 hash 结构,记录线程标示和重入次数;利用 watchDog 延续锁时间;利用信号量和消息订阅控制锁重试等待
  • 缺陷:redis宕机引起锁失效问题、主从延迟或宕机导致节点未同步锁问题

3)Redisson的multiLock:

  • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
  • 缺陷:运维成本高、实现复杂
TIP

EX 如何避免死锁?

  • EX 是 Redis SET 命令的一个选项,用于 设置 key 的过期时间(单位:秒)
    即使持有锁的客户端异常退出、或忘记调用 unlock()、业务异常无法释放锁,Redis 也会在设定的 EX 时间后 自动删除该锁 key,从而释放锁,让其他客户端有机会重新获取。利用 EX 设置锁的自动过期时间,相当于给分布式锁加了一个“安全熔断机制”——即使程序出错,锁也能在有限时间内自动释放,从而有效避免死锁。

利用信号量控制锁重试等待

在分布式系统中,当多个客户端竞争同一把锁时,未抢到锁的客户端通常需要“重试”(即过一会儿再尝试获取)。但如果每个客户端都无限制地、高频地轮询重试,会导致:

  • Redis 服务器压力剧增(大量无效请求)
  • 客户端 CPU 空转
  1. 信号量作为协调器
    • 当客户端首次 tryLock() 失败时,不立即返回失败,而是向 Redis 注册一个“等待许可”请求。
    • Redisson 在 Redis 中维护一个与锁关联的 信号量,记录有多少客户端在排队等待。
  2. 释放锁时唤醒等待者
    • 当持有锁的客户端调用 unlock() 时,除了删除锁 key,还会 释放一个信号量许可release())。
    • 此操作会 通知(或唤醒)一个正在等待的客户端,让它立即重试获取锁,而不是被动轮询。
  3. 支持超时与公平性
    • 客户端可设置最大等待时间(如 tryLock(10, 30, TimeUnit.SECONDS):最多等 10 秒,持锁 30 秒)。
    • Redisson 默认按 FIFO(先到先得) 唤醒等待者,保证公平性。

Redis优化秒杀#

Redis消息队列实现异步秒杀#

达人探店#

好友关注#

附近的商户#

用户签到#

UV统计#

Redis - 实战
https://zzyang.top/posts/redis-actualcombat/
作者
张小阳
发布于
2025-12-18
许可协议
CC BY-NC-SA 4.0

评论区

评论区加载中...

如果长时间无法显示,请尝试刷新页面。