分布式锁总结
Table of Contents
1 前言
在微服务架构中,由于服务是多机部署的,所以常常需要分布式锁来进行互斥操作。比如一些服务中小的定时任务,是随服务一起启动的,但是由于多机部署,如果不做处理,会同时有多个进程在处理,会有一些麻烦。在之前我一般使用的redis来维护这样一个状态。但是之前看到公司大佬有用到consul来实现(原来consul api已经自带了), 所以总结下相关内容。对于分布式锁的实现需要满足以下一些条件:
- 同时只有一个client可以获取一个锁。
- client异常退出不会影响锁的释放。
- 锁只能由获取锁的client释放。
要满足条件一,需要一个标志位,而且标志位的操作必须是原子的。要满足条件二, 得给锁一个占用时间,过期自动释放。要满足条件三,client端能检测标志位是否由自己设置。
2 redis实现
由于redis是在后台服务中用的比较多的一个组件,所以很多情况下可以使用redis来实现分布式锁是很方便的。
2.1 版本一:
lock: 通过SET KEY VALUE NX 来获取锁
unlock: 通过DEL KEY 来释放锁
通过设置NX标志保证唯一性。但是这样有个问题,就是没有满足条件二。如果释放锁失败了(client crash了或者网络出现了问题),key就会一直在redis中。之后的client就不能再获取了。所以为了能够自动释放需要加上超时设置。
2.2 版本二:
lock: SET KEY VALUE NX PX 3000
unlock: DEL KEY 这个时候其实还是没有完美解决锁超时的问题,比如:获取锁的服务需要的锁的时间比你设置key的过期时间还要长,这时候锁就被自动释放就有可能被其他client获取到。通常做法是开一个goroutine或者线程在要过期的时候刷新锁的过期时间。
extern: SET KEY VALUE PX 3000 但是这里又出现了一个小问题,extern这个操作并不知道是谁设置了这个锁, 有可能锁过期了,还傻乎乎的以为自己占用了锁,一直延长锁的过期时间。也就是没满足条件三,client只能操作自己获取的锁。而且这里unlock操作也没有判断是否是自己的锁。如果使用方操作不当的话很容易删掉别人的key。所以这两个地方需要加上check,只有在是自己的锁的情况下才能操作,这里可以利用VALUE来存储当前占有锁是哪个client。所以需要一个clientid。还有需要注意的地方就是check跟unlock,check跟extern的操作是原子的,不然一切都没了意义,一般情况下使用redis lua脚本来实现,由于redis是单线程,所以能保证原子性。
2.3 版本三:
lock: SET KEY VALUE NX PX 3000
unlock:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
extern:
if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("SET", KEYS[1], ARGV[1], "XX", "PX", ARGV[2]) else return 0 end
版本三就是一个可以在通常情况下使用的分布式锁了。但是还有点小问题,如果redis crash了咋办?这里我们只使用了一个redis。而redis集群是异步同步,有可能丢失数据,也有可能存在时间差,在这种情况下都有可能被其他client获取到lock。当然redis官方提供了一个redlock的算法,利用多台redis master, 每次lock,unlock,extern都需要大多是redis节点操作成功才算成功,复杂度也提升了很多,可以参考这里的实现:https://github.com/go-redsync/redsync。 还有要注意的地方就是最好操作实现幂等性,防止锁出现未知问题。
3 consul 实现
对于consul来说,需要关注的点是一样的。只不过在consul有不同的抽象。consul中使用kv跟session来实现。session用来标识是哪个client获取了lock。consul比较好的一点是它是强一致的集群。consul服务的稳定性有保证。 参考官方文档: https://www.consul.io/docs/guides/leader-election.html 在官方的sdk中已经实现了一个可以直接使用的分布式锁,操作如下: lock:
func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { // Hold the lock as we try to acquire l.l.Lock() defer l.l.Unlock() // Check if we already hold the lock if l.isHeld { return nil, ErrLockHeld } // Check if we need to create a session first // 如果没有session,创建一个,用于标识获取锁的对象和过期管理 l.lockSession = l.opts.Session if l.lockSession == "" { s, err := l.createSession() if err != nil { return nil, fmt.Errorf("failed to create session: %v", err) } l.sessionRenew = make(chan struct{}) l.lockSession = s session := l.c.Session() // 在没释放锁之前一直刷新session的时间 go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew) // If we fail to acquire the lock, cleanup the session defer func() { if !l.isHeld { close(l.sessionRenew) l.sessionRenew = nil } }() } // Setup the query options kv := l.c.KV() qOpts := &QueryOptions{ WaitTime: l.opts.LockWaitTime, } start := time.Now() attempts := 0 WAIT: // Check if we should quit select { case <-stopCh: return nil, nil default: } // Handle the one-shot mode. if l.opts.LockTryOnce && attempts > 0 { elapsed := time.Since(start) if elapsed > qOpts.WaitTime { return nil, nil } qOpts.WaitTime -= elapsed } attempts++ // Look for an existing lock, blocking until not taken pair, meta, err := kv.Get(l.opts.Key, qOpts) if err != nil { return nil, fmt.Errorf("failed to read lock: %v", err) } if pair != nil && pair.Flags != LockFlagValue { return nil, ErrLockConflict } locked := false if pair != nil && pair.Session == l.lockSession { goto HELD } if pair != nil && pair.Session != "" { qOpts.WaitIndex = meta.LastIndex goto WAIT } //获取一个lock pair = l.lockEntry(l.lockSession) locked, _, err = kv.Acquire(pair, nil) if err != nil { return nil, fmt.Errorf("failed to acquire lock: %v", err) } // Handle the case of not getting the lock if !locked { // Determine why the lock failed qOpts.WaitIndex = 0 pair, meta, err = kv.Get(l.opts.Key, qOpts) if pair != nil && pair.Session != "" { //If the session is not null, this means that a wait can safely happen //using a long poll qOpts.WaitIndex = meta.LastIndex goto WAIT } else { // If the session is empty and the lock failed to acquire, then it means // a lock-delay is in effect and a timed wait must be used select { case <-time.After(DefaultLockRetryTime): goto WAIT case <-stopCh: return nil, nil } } } HELD: // Watch to ensure we maintain leadership leaderCh := make(chan struct{}) //一直监视锁的占用者,当锁的占用者不是当前session,close掉leaderCh go l.monitorLock(l.lockSession, leaderCh) // Set that we own the lock l.isHeld = true // Locked! All done return leaderCh, nil }
unlock:
//unlock操作比较简单 func (l *Lock) Unlock() error { // Hold the lock as we try to release l.l.Lock() defer l.l.Unlock() // Ensure the lock is actually held if !l.isHeld { return ErrLockNotHeld } // Set that we no longer own the lock l.isHeld = false // Stop the session renew // 停止延长session的时间 if l.sessionRenew != nil { defer func() { close(l.sessionRenew) l.sessionRenew = nil }() } // Get the lock entry, and clear the lock session lockEnt := l.lockEntry(l.lockSession) l.lockSession = "" // Release the lock explicitly //直接调用release释放掉lock kv := l.c.KV() _, _, err := kv.Release(lockEnt, nil) if err != nil { return fmt.Errorf("failed to release lock: %v", err) } return nil }
从上可以看到consul的实现还是比较严谨的。比如不停的监视锁的占用者。比如在锁释放后才停止延长session的周期(这里要注意的是就算锁释放失败了也不代表你占用了锁,因为已经停止了session的刷新, 锁会被过期自动释放掉,所以最好是在调用之后不再使用互斥操作, 但是可以多次调用unlock)
4 参考资料
redis官方介绍: https://redis.io/topics/distlock reids分布式锁go实现: https://github.com/go-redsync/redsync consul信号量: https://www.consul.io/docs/guides/semaphore.html consul排他锁: https://www.consul.io/docs/guides/leader-election.html