Java手写分布式锁的实现

Part1前言

  • 随着互联网业务的发展,原本单机部署的系统演化成如今的分布式集群系统后,由于分布式系统多线程,多进程并且分布在不同的机器上,这会使原本的单机锁失效,而且单纯的Java API并不能提供分布式锁的能力,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

  • 本文将从实现本地锁所产生的问题入手,从而介绍分布式锁主流的实现方案,重点实现基于Redis的分布式锁。

  • 相信我,往下看,你也会有所收获。如果观点有分歧,欢迎评论讨论。

  • 本地锁会出现的问题(此篇幅代码图片过多,因此放在最后)

Part2分布式锁的实现

1分布式锁主要的实现有

  • 基于数据库实现分布式锁

  • 基于缓存(Redis等,本文基于Redis实现手写分布式锁,因为这样可以更好的理解分布式锁的原理及实现,当然也可以使用Redisson)

  • 基于Zookeeper

2每种分布式锁的解决方案都有各自的优缺点

  • 性能角度:redis > zk > mysql

  • 安全角度:zk > redis == mysql

  • 难易程度:zk > redis > mysql

3分布式锁要具备的特点

  • 独占排他互斥

    • 可以通过 setnx (redis命令:执行多次,只有一次能够成功)
    • set key value ex 3 nx
  • 防死锁发生

    • 请求获取到锁之后,服务器挂掉了,导致锁无法释放:给lock锁添加过期时间
    1. 可以通过redis命令 expire
    2. 或者通过 set key value ex 3 nx
  • 保证原子性

    • redis是单线程的,接受或者执行指令遵循one-by-one原则。只要指令之间不被插入其他指令即可保证原子性,lua脚本批量发送多个指令给redis服务器,lua脚本也可以实现一些业务逻辑,redis集成了lua脚本,可以直接使用eval指令执行lua脚本。
    1. 获取锁和设置过期时间之间
    2. 判断和删除之间:lua脚本
  • 防误删:

    • uuid给每个线程的锁添加唯一标识
  • 自动续期

  • 可重入:hash数据结构 + lua脚本

  • 集群情况下,可能导致锁失效:RedLock算法(redis特有的)

    • 一个请求从主中获取到锁,从还没来得及同步数据,主就挂掉了,从就升级为新主,新的请求就可以从新主中获取锁

4基于redis分布式锁的基本实现

我们可以借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false),流程图如下图所示:

  • 多个客户端同时尝试获取锁(setnx)

  • 获取成功,执行业务逻辑,执行完成释放锁(del)

  • 其他客户端等待重试

图片
  • 代码实现:
public void testLock() {
    // 1. 从redis中获取锁,setnx
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111");
    if (lock) {
        // 查询redis中的num值
        String value = this.redisTemplate.opsForValue().get("num");
        // 没有该值return
        if (StringUtils.isBlank(value)){
            return ;
        }
        // 有值就转成成int
        int num = Integer.parseInt(value);
        // 把redis中的num值+1
        this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
        // 2. 释放锁 del
        this.redisTemplate.delete("lock");
    } else {
        // 3. 每隔1秒钟回调一次,再次尝试获取锁
        try {
            Thread.sleep(1000);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

那么以上代码是否可以解决全部问题呢? 显示是不能的,我们假设setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放,怎么办呢?

5优化分布式锁_设置过期时间

设置过期有俩种方式可以选择:

  • 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

  • 在set时指定过期时间(推荐)

图片代码实现优化就是在设置锁的时候设置过期时间:

public void testLock() {
    // 1. 从redis中获取锁,setnx
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.MINUTES);
    if (lock) {
        //与之前相同代码略过
       ...
    }
}

那么还会不会存在问题呢?

场景:如果业务逻辑的执行时间是7s。执行流程如下:

  • index1业务逻辑没执行完,3秒后锁被自动释放。

  • index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

  • index3获取到锁,执行业务逻辑

  • index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

  • 最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

6优化分布式锁_防止误删除

图片
public void testLock() {
    // 1. 从redis中获取锁,setnx
    String uuid = UUID.randomUUID().toString();
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.MINUTES);
    if (lock) {
        //与之前相同代码略过
       ...
       // 2. 释放锁 del
       if (StringUtils.equals(redisTemplate.opsForValue().get("lock"),uuid)){
            this.redisTemplate.delete("lock");
        }
    }
}

场景:

  • index1执行删除时,查询到的lock值确实和uuid相等

  • index1执行删除前,lock刚好过期时间已到,被redis自动释放

  • index2获取了lock

  • index1执行删除,此时会把index2的lock删除

  • 问题:缺乏原子性

7优化分布式锁_LUA脚本保证删除的原子性

首先我们先简单介绍一下lua脚本的基本知识(lua脚本是c语言)定义变量:

  • 全局变量:a = 11
  • 局部变量:local b = 22
  • redis不允许lua脚本创建全局变量,只能声明局部变量

流程控制:

if(exp) then
业务逻辑
elseif(exp) then
业务逻辑
else
业务逻辑
end

redis中执行lua脚本:

  • eval script numkeys keys[] args[]  : eval指令的输出不是lua脚本的打印而是lua脚本的返回值
    • script:lua脚本字符串,定义动态变量:KEYS[1] ARGV[1]
    • numkeys:key数组的元素个数
    • keys:keys数组
    • args:argv数组

redis集群执行lua脚本可能会报错:如果所有keys不在同一个分片上,lua脚本就会报错:解决方案是:

  • keys只传一个
  • 可以使用CLUSTER KEYSLOT bb{xx}

删除LUA脚本:

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
public void testLock() {
    // 1. 从redis中获取锁,setnx
    String uuid = UUID.randomUUID().toString();
    Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
    if (lock) {
        //与之前相同代码略过
        ...
        // 2. 释放锁 del
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        this.redisTemplate.execute(new DefaultRedisScript<>(script), Arrays.asList("lock"), uuid);
    } else {
        // 3. 每隔1秒钟回调一次,再次尝试获取锁
        try {
            Thread.sleep(1000);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

8优化分布式锁_可以重入

上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。

可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。

我们基于Redis Hash 实现方案

Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。

  • 加锁
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
then
    redis.call('hincrby', KEYS[1], ARGV[1], 1);
    redis.call('expire', KEYS[1], ARGV[2]);
    return 1;
else
 return 0;
end

假设值为:KEYS:[lock]ARGV[uuid, expire]

如果锁不存在或者这是自己的锁,就通过hincrby(不存在新增,存在就加1)获取锁或者锁次数加1。 代码实例如下:

private Boolean tryLock(String lockName, String uuid, Long expire){
    String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) " +
        "then" +
        "    redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
        "    redis.call('expire', KEYS[1], ARGV[2]);" +
        "    return 1;" +
        "else" +
        "   return 0;" +
        "end";
    if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){
        try {
            // 没有获取到锁,重试
            Thread.sleep(200);
            tryLock(lockName, uuid, expire);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 获取到锁,返回true
    return true;
}
  • 解锁
-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
    return nil;
end;
-- 小于等于 0 代表可以解锁
if (redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
    return 0;
else
    redis.call('del', KEYS[1]);
    return 1;
end;

这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:

  • 1 代表解锁成功,锁被释放

  • 0 代表可重入次数被减 1

  • null 代表其他线程尝试解锁,解锁失败

如果返回值使用 Boolean,Spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断,所以返回类型只好使用 Long。

private void unlock(String lockName, String uuid){
    String script = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then" +
        "    return nil;" +
        "end;" +
        "if (redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then" +
        "    return 0;" +
        "else" +
        "    redis.call('del', KEYS[1]);" +
        "    return 1;" +
        "end;";
    // 这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:
    // 1 代表解锁成功,锁被释放
    // 0 代表可重入次数被减 1
    // null 代表其他线程尝试解锁,解锁失败
    Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Lists.newArrayList(lockName), uuid);
    // 如果未返回值,代表尝试解其他线程的锁
    if (result == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: "
                                               + lockName + " with request: "  + uuid);
    }
}
  • 使用
public void testLock() {
    // 加锁
    String uuid = UUID.randomUUID().toString();
    Boolean lock = this.tryLock("lock", uuid, 300l);
    if (lock) {
        // 读取redis中的num值
        String numString = this.redisTemplate.opsForValue().get("num");
        if (StringUtils.isBlank(numString)) {
            return;
        }
        // ++操作
        Integer num = Integer.parseInt(numString);
        num++;
        // 放入redis
        this.redisTemplate.opsForValue().set("num", String.valueOf(num));
        // 测试可重入性
        this.testSubLock(uuid);
        // 释放锁
        this.unlock("lock", uuid);
    }
}
// 测试可重入性
private void testSubLock(String uuid){
    // 加锁
    Boolean lock = this.tryLock("lock", uuid, 300l);
    if (lock) {
        System.out.println("分布式可重入锁。。。");
        this.unlock("lock", uuid);
    }
}

9优化分布式锁_自动续期

A线程超时时间设为10s(为了解决死锁问题),但代码执行时间可能需要30s,然后redis服务端10s后将锁删除。 此时,B线程恰好申请锁,redis服务端不存在该锁,可以申请,也执行了代码。

那么问题来了, A、B线程都同时获取到锁并执行业务逻辑,这与分布式锁最基本的性质相违背:在任意一个时刻,只有一个客户端持有锁(即独享排他)。

锁延期方法:开启子线程执行延期

/**
     * 锁延期
     * 线程等待超时时间的2/3时间后,执行锁延时代码,直到业务逻辑执行完毕,因此在此过程中,其他线程无法获取到锁,保证了线程安全性
     * @param lockName
     * @param expire 单位:毫秒
     */
private void renewTime(String lockName, String uuid, Long expire){
    String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1; else return 0; end";
    new Thread(() -> {
        while (this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Lists.newArrayList(lockName), uuid, expire.toString())){
            try {
                // 到达过期时间的2/3时间,自动续期
                Thread.sleep(expire / 3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();
}

获取锁成功后,调用延期方法给锁 定时延期:

private Boolean tryLock(String lockName, String uuid, Long expire){
    String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) " +
        "then" +
        "    redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
        "    redis.call('expire', KEYS[1], ARGV[2]);" +
        "    return 1;" +
        "else" +
        "   return 0;" +
        "end";
    if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, expire.toString())){
        try {
            // 没有获取到锁,重试
            Thread.sleep(200);
            tryLock(lockName, uuid, expire);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 锁续期
    this.renewTime(lockName, uuid, expire * 1000);
    // 获取到锁,返回true
    return true;
}

10优化分布式锁_Redlock算法

redis集群状态下的问题:

  • 客户端A从master获取到锁

  • 在master将锁同步到slave之前,master宕掉了。

  • slave节点被晋级为master节点

  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。
    安全失效 解决集群下锁失效,参照redis官方网站针对redlock文档:redis.io/topics/dist…[1]

11本地锁会出现的问题

我们知道java中有synchronized、lock锁、读写锁ReadWriteLock,众所周知这些锁都是本地锁。

提到锁就不得不提JUC:java.util.concurrent包,又称concurrent包。jdk1.5提供,为多线程高并发编程而提供的包,但此文章的场景是分布式场景,后续会出JUC的文章。

  • 简单的介绍一下synchronized及lock锁
    • synchronized是一个关键字,lock是一个接口,ReentrantLock是实现了lock接口的一个类
    • ReentrantLock:悲观的独占的互斥的排他的可公平可不公平的可重入锁
    • synchronized:悲观的独占的互斥的排他的非公平的可重入锁

准备

redis、ab工具(压测)

不使用任何锁的情况下

  • 我们首先创建一个测试方法,testNoLock
@GetMapping("/test")
public void testNoLock(){
    String count = (String) this.redisTemplate.opsForValue().get("count");
    if (count == null){
        //没有值直接返回
        return;
    }
    // 有值就转成成int
    int number = Integer.parseInt(count);
    // 把redis中的num值+1
    this.redisTemplate.opsForValue().set("count", String.valueOf(++number));
}

测试之前的查看值为1

@GetMapping("/getCount")
public String getCount(){
    String count = String.valueOf(this.redisTemplate.opsForValue().get("count"));
    return count; //1
}
  • 接下来使用ab压力测试工具
// ab  -n(一次发送的请求数)  -c(请求的并发数) 访问路径
ab -n100 -c50 http://127.0.0.1:8080/test/test

再次查询结果为6,说明问题很大

图片

使用本地锁

public synchronized void testNoLock(){
    String count = String.valueOf(this.redisTemplate.opsForValue().get("count"));
    if ("null".equals(count)){
        //没有值直接返回
        return;
    }
    // 有值就转成成int
    int number = Integer.parseInt(count);
    // 把redis中的num值+1
    this.redisTemplate.opsForValue().set("count", String.valueOf(++number));
}
  • 再次使用ab压力测试工具
ab -n100 -c50 http://127.0.0.1:8080/test/test

此次结果为106,说明结果是正确的,看样子结果是非常完美的,但是真的很完美吗?

图片

使用集群+本地锁

  • 我们只需要在idea中在启动俩个服务,修改端口号,三个运行实例的名称是相同的,并且网关的配置就是通过服务名在负载均衡,所以我们只需要访问网关,网关就会给我们做负载均衡了。
图片
  • 再次使用ab压力测试工具(将count重置为1)
ab -n100 -c50 http://127.0.0.1:8080/test/test

此次的结果为58!!!

图片

到此我们可以知道,本地锁是有局限性的。

参考资料

[1]redlock文档: https://redis.io/topics/distlock

来源|juejin.cn/post/7257882929718788157

扫码领红包

微信赞赏支付宝扫码领红包

发表回复

后才能评论