1 先来了解下分布式锁
1.1 什么是分布式锁
分布式锁,即分布式系统中的锁,我们通过锁解决 控制共享资源访问 的问题,来保证只有一个线程可以访问被保护的资源。
1.2 分布式锁的实现方案
- 基于数据库实现分布式锁
- 基于Zookeeper实现分布式锁
- 基于Redis实现分布式锁
等等,本篇基于Redis角度进行讨论
1.3 分布式锁满足哪些特性
- 互斥性:在分布式系统下,一个事件在同一个时间内只能被一个线程执行,即只能有一个线程持有锁。
- 安全性:可以方便的获取锁和释放锁,不产生死锁情况
- 过期性:具备锁失效机制,即可以在时效预期外自动解锁,防止死锁
- 可重入:具备可重入特性(可理解为重新进入,由多于一个任务并
- 高性能:高性能的获取锁与释放锁
- 高可用性:高可用的获取锁与释放锁
1.4 互斥特性
1.4.1 实现互斥特性
1.4.1.1 SETNX命令
SETNX 是 set if not exists 的缩写,当且仅当 key 不存在时,则设置 value 给这个key。若给定的 key 已经存在,则 SETNX 不做任何动作。
命令的返回值说明:
- 1:说明该进程获得锁,将 key 的值设为 value
- 0:说明其他进程已经获得了锁,进程不能进入临界区。
举例说明:setnx lock.key lock.value
> SETNX lock.user_063105015 1
(integer) 1 # 获取编号为 063105015 用户成功
如果已经被获取过了,则获取失败
> SETNX lock.user_063105015 1
(integer) 0 # 获取编号为 063105015 用户失败
1.4.1.2 get命令
获取key的值,如果存在,则返回;如果不存在,则返回nil
# 获取成功
> GET lock.user_063105015
"1"
# 获取失败
> GET lock.user_123456789
(nil)
1.4.1.3 getset命令
原子的设置值的办法,对key设置newValue这个值,并且返回key原来的旧值。
# 重置用户信息
> getset lock.user_063105015 0
"1" # 原值为1
# 再次重置
> getset lock.user_063105015 1
"0" # 原值为0
1.4.1.4 删除命令,用完之后进行锁释放
> DEL lock.user_063105015
(integer) 1
具体执行流程如下:
1.3.1.5 异常导致的锁释放问题
可能会因为一些场景,造成锁无法释放,如下:
- 调用服务或者客户端崩溃,无法正确的处理锁释放的工作。
- 业务程序的异常执行,没有操作释放锁的 DEL指令。
这种情况下,锁就会一直占用着,不会被释放,其他线程也无法获得。所以必须得有个自动释放锁的过程。
1.4.2 超时释放
超时释放其实就是重置,目的是避免因为各种原因导致的锁长时间无法释放。
做法就是我们给锁加个过期时间(EXPIRE Time):
# 给用户 063105015 加锁
> SETNX lock.user_063105015 1
(integer) 1
# 设置过期时间,到时间没删除则自动释放
> EXPIRE lock.user_063105015 120 # 120秒之后自动释放
(integer) 1
为了保证执行时的原子性,Redis 官方扩展了 SET 命令,既能满足获取对象,又能保证设置超时的时间语义。
避免出现了获取锁完成之后,执行超时设置失败微软无法释放锁的情况。保证要么都成功,要么都不执行。
# 示例如下:
SET lock.user_063105015 1 NX PX 60000
- NX:就是Not Exist,表示只有用户编号为 063105015 不存在的时候才可以 SET 成功,并且只有单个线程可以获取锁;
- PX 60000:表示对这个锁设置一个60s的过期时间。
1.4.3 对锁进行唯一标识
经常会出现一种情况,就是你获取到锁之后,因为各种原因(比如你的服务线程故障、网络抖动 等等),没有执行完成,或者没有释放锁,
这时候锁也过了 EXPIRE TIME,就自动释放了。当另外一个线程开锁成功,你的线程响应过来了,把人家的锁给释放了,这样就有问题了。
为了避免这种操作,我们要对同一个的锁做唯一识别码,在释放锁之前,先判断下是不是自己设置的那个锁,如下:
# 设置10086专用值
> SET lock.user_063105015 10086 NX PX 60000
OK
# 设置成功,获取检查确实是10086
> get lock.user_063105015
"10086"
# 伪代码:删除前进项确认是不是自己加的那个锁
if ( redis.get("lock.user_063105015").equals("10086")) {
redis.del("lock.user_063105015"); // 只有对比成功才进行删除,释放锁
}
1.4.4 实现可重入锁
可重入锁可以理解为重新进入,由多于一个任务并发使用,而不必担心数据错误。
- 可重入性就就保证线程能继续执行,防止在同一线程中多次获取锁而导致死锁发生
- 不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行
这边说说可重入锁,比如你执行线程的方案a获取锁之后,你的a方法后,线程继续执行b方法也需要获取锁,如果这时候不可重入,
线程就需要等待锁的释放,进入争抢。
这边的解法就是对线程加锁的锁值进行增减,同一个线程的方法遇到加锁则锁值+1,遇到退锁则锁值-1,当前仅当锁值=0的时候,说明这个锁真正的被释放了。
Java中的Redisson 类库就是通过 Redis Hash 来实现可重入锁。
加锁的逻辑
我们可以使用 Redis hash 结构实现,key 表示被锁的共享资源, hash 结构的 fieldKey 的 value 则保存加锁的次数。
实现如下( KEYS1 = "lock.user_063105015", ARGV [10000,uuid):
KEYS[1] = key的值
ARGV[1]) = 持有锁的时间
ARGV[2] = getLockName(threadId) 下面id就算系统在启动的时候会全局生成的uuid 来作为当前进程的id,加上线程id就是getLockName(threadId)了,可以理解为:进程ID+系统ID = ARGV[2]
# 1 为 true
# 0 为 false
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
参数说明
- hincrby :将hash中指定域的值增加给定的数字
- pexpire:设置key的有效时间以毫秒为单位
- hexists:判断field是否存在于hash中
- pttl:获取key的有效毫秒数
程序说明
- Redis exists 命令判断 lock.user_063105015 锁是否存在
- 锁不存在,hincrby 创建一个键为 lock.user_063105015 的 hash 表,键为 uuid,初始化值为 0,然后再次加 1,最后设置过期时间。
- 锁存在,hexists判断 lock 对应的 hash 表中是否存在 uuid 键,存在则 + 1,并重置过期时间
- 不符合以上的条件的都走到默认返回