【DB系列】Redis实现分布式锁(应用篇)

文章目录
  1. I. 分布式锁
    1. 1. 介绍
    2. 2. 应用场景
    3. 3. DB版分布式锁
  2. II. Redis版分布式锁
    1. 1. 实现
    2. 2. 测试
  3. II. 其他
    1. 0. 项目
    2. 1. 一灰灰Blog

分布式锁,应用场景可以说是比较多的,网上随便一查就能搜到很多的文章;那么这里为什么跟着来一篇呢,原因也很简单,别人写的再好再清晰,那也是别人的知识点;自己思考过的、写过的才属于自己;

本篇虽然名为redis实现分布式锁,但也会说一下基于数据库实现分布式锁的思路(至于ZK实现的分布式锁,没玩过zk,就不抄了)

I. 分布式锁

1. 介绍

锁,这个名词或者动词可以说频繁的出现在我们的编程生涯中,当然我们最常见的就是单体应用中的synchronized以及Lock,主要就是为了确保某一段代码逻辑(特别是写逻辑)一次只能被一个业务方调用执行

而分布式锁最大的特点就是,希望即便是在分布式的环境中,即便有多个应用、多个实例操作某段业务逻辑,也能确保这段逻辑的”串行”执行

2. 应用场景

分布式锁最主要的特点就是希望能确保,在某一时间段内,只能有一个业务方,访问某段业务逻辑

从上面的描述,一个非常典型的case就是电商里面的下单减库存

我们在创建订单之前,得确保库存足够,假设有这么一个场景

  • 商品只有2个库存
  • 用户A,下单买两个,去判断库存为2,可以购买
  • 用户B,下单买1个,去判断库存,发现也是2,可以购买
  • 当用户A,B的订单都完成了,最终库存变成-1了,导致超卖

如果我们使用分布式锁,把这段逻辑包裹住

  • 用户A,下单买2个,抢占分布式锁成功,判断库存为2
  • 用户B,下单买1个,抢占分布式锁,已经被A占用了,抢不到,等待;直到用户A释放
  • 用户A,订单生成,库存-2,现在真实库存变为0,释放锁
  • 用户B,多次尝试获取锁,成功之后,再去看库存,为0,不满足,下单失败

3. DB版分布式锁

这种方式我个人没有用过,根据网上查询到一些资料,从原理上进行简单的说明

唯一键约束方式

借助mysql的唯一键约束,确保一次只能有一个insert sql是成功的,操作成功的就认为是成功的抢了锁;如果插入失败,则表示没有抢占;删除这条记录就表示释放锁

这种方式实现比较简单,但是问题比较多

  • 如果抢占锁的小伙伴一直不删除这条记录,那这个锁就永不释放么?
  • 抢占锁失败直接抛异常,一个是不友好,另外一个就是非阻塞方式,需要我们自己来循环的判断是否
  • 非重入的,即持有锁的小伙伴,再次去获取锁的时候,也是失败
  • 性能瓶颈

乐观锁

在数据库中添加一个version字段,在修改的时候,加一个version的查询限定,一把的业务逻辑为

  • select * from table where id=1
  • 执行业务逻辑
  • update xxx, version=version+1 wherer id=1 and version=oldVersion 如果执行成功,则表示正确持有锁,业务流程ok;如果失败,则表示没有抢占到锁,回滚

上面这个做法,有个比较明显的问题,没有阻塞操作,和我们预期的分布式锁差别有点大

悲观锁

如果想解决上面的阻塞问题,我们可以考虑使用写锁

  • 开启事务
  • select * from table where id=1 for update
  • 执行业务逻辑
  • 提交事务,释放锁

上面是利用数据库的写锁来完成排他性,同样存在锁释放问题

小结

上面的几个思路主要来自于网络上的一些博文,就我个人看完的观点,基于db的分布式锁绝不是一个优雅的选择方案,如非万不得已,不要这么干

II. Redis版分布式锁

关于redis实现分布式锁的方案由来已久了,主要是借助redis的单线程模型,以及命令执行的原子性,通过确保同一时刻,只能有一个setnx成功,即表示抢占到锁;其他失败的小伙伴只能遗憾的加入下一次的抢锁计划

为了避免持有锁的小伙伴因为异常挂掉没有释放锁,从而导致其他客户端都拿不到锁的问题,因此在抢占锁的时候,我们需要考虑设置有效期;幸运的是redis已经支持一个命令执行上面的过程了

1. 实现

redis分布式锁

设置锁和释放锁两个方面;

  • setnx:当不存在时,设置成功;存在时,设置失败
  • 为了防止持有锁的客户端挂掉,没有释放锁,从而导致其他客户端都拿不到锁的case,我们需要设置锁的有效期

这里我们借助SET key value [EX seconds | PX milliseconds] [NX | XX] [KEEPTTL]来实现原子的操作

下面是基于RedisTemplate来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static final String ERROR_CODE = "error";
@Autowired
private RedisTemplate redisTemplate;

private Random random;

public RedisDistributeLock() {
random = new Random();
}

private String randPrefix() {
return String.format("%04d", random.nextInt(10000));
}

public String tryLock(String lockKey, long expireSeconds, int maxRetryTime) {
// 为了避免value冲突,加一个随机的前缀串
String value = randPrefix() + "_" + (System.currentTimeMillis() + expireSeconds * 1000 + 1);
boolean ans;
int retryTimes = 0;
do {
ans = redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireSeconds, TimeUnit.SECONDS);
if (ans) {
return value;
}

retryTimes++;
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
} while (retryTimes < maxRetryTime);

return ERROR_CODE;
}

释放锁的时候需要注意,持有者不能把别人的锁给释放掉了(比如A持有了锁,超时时间为5s,但是它的业务逻辑超过了5s,导致B也获取到了锁,如果这时候A执行完了,把B的锁删掉,那就gg了)

因此,删除的重点是,只能删自己的锁(这里就需要借助lua脚本来执行原子操作了)

1
2
3
4
5
6
7
8
public boolean release(String lockKey, String value) {
//释放锁的lua脚本,保证判断和删除操作的原子性
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
@SuppressWarnings("unchecked")
RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class);
return (boolean) redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value);
}

2. 测试

我们写一个简单的并发扣库存的测试case,主要的逻辑就是当前的库存小于购买数时,下单失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@SpringBootApplication
public class Application {

private AtomicInteger count = new AtomicInteger(35);

/**
* 在一个线程持有锁的过程中,不允许其他的线程持有锁
*
* @param redisDistributeLock
* @param lockKey
* @param threadName
* @param retryTime
*/
private void threadTest(RedisDistributeLock redisDistributeLock, String lockKey, String threadName, int retryTime, int n) {
new Thread(new Runnable() {
@Override
public void run() {
String value = redisDistributeLock.tryLock(lockKey, 10_000, retryTime);
if (count.get() >= n) {
int left = count.addAndGet(-n);
System.out.println(threadName + "减库存,剩余: " + left + " 购买: " + n);
} else {
System.out.println(threadName + "库存不足下单失败,当前库存: " + count.get() + " 购买: " + n);
}
redisDistributeLock.release(lockKey, value);
}
}).start();
}

public Application(RedisDistributeLock redisDistributeLock) throws InterruptedException {
String lockKey = "lock_key";
Random random = new Random();
for (int i = 0; i < 30; i++) {
threadTest(redisDistributeLock, lockKey, "t-" + i, random.nextInt(30), random.nextInt(3) + 1);
}
Thread.sleep(20 * 1000);
}

public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}

II. 其他

0. 项目

系列博文

工程源码

1. 一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

一灰灰blog


打赏 如果觉得我的文章对您有帮助,请随意打赏。
分享到