UP | HOME

分布式锁总结

Table of Contents

1 前言

在微服务架构中,由于服务是多机部署的,所以常常需要分布式锁来进行互斥操作。比如一些服务中小的定时任务,是随服务一起启动的,但是由于多机部署,如果不做处理,会同时有多个进程在处理,会有一些麻烦。在之前我一般使用的redis来维护这样一个状态。但是之前看到公司大佬有用到consul来实现(原来consul api已经自带了), 所以总结下相关内容。对于分布式锁的实现需要满足以下一些条件:

  1. 同时只有一个client可以获取一个锁。
  2. client异常退出不会影响锁的释放。
  3. 锁只能由获取锁的client释放。

要满足条件一,需要一个标志位,而且标志位的操作必须是原子的。要满足条件二, 得给锁一个占用时间,过期自动释放。要满足条件三,client端能检测标志位是否由自己设置。

2 redis实现

由于redis是在后台服务中用的比较多的一个组件,所以很多情况下可以使用redis来实现分布式锁是很方便的。

2.1 版本一:

lock: 通过SET KEY VALUE NX 来获取锁

unlock: 通过DEL KEY 来释放锁

通过设置NX标志保证唯一性。但是这样有个问题,就是没有满足条件二。如果释放锁失败了(client crash了或者网络出现了问题),key就会一直在redis中。之后的client就不能再获取了。所以为了能够自动释放需要加上超时设置。

2.2 版本二:

lock: SET KEY VALUE NX PX 3000

unlock: DEL KEY 这个时候其实还是没有完美解决锁超时的问题,比如:获取锁的服务需要的锁的时间比你设置key的过期时间还要长,这时候锁就被自动释放就有可能被其他client获取到。通常做法是开一个goroutine或者线程在要过期的时候刷新锁的过期时间。

extern: SET KEY VALUE PX 3000 但是这里又出现了一个小问题,extern这个操作并不知道是谁设置了这个锁, 有可能锁过期了,还傻乎乎的以为自己占用了锁,一直延长锁的过期时间。也就是没满足条件三,client只能操作自己获取的锁。而且这里unlock操作也没有判断是否是自己的锁。如果使用方操作不当的话很容易删掉别人的key。所以这两个地方需要加上check,只有在是自己的锁的情况下才能操作,这里可以利用VALUE来存储当前占有锁是哪个client。所以需要一个clientid。还有需要注意的地方就是check跟unlock,check跟extern的操作是原子的,不然一切都没了意义,一般情况下使用redis lua脚本来实现,由于redis是单线程,所以能保证原子性。

2.3 版本三:

lock: SET KEY VALUE NX PX 3000

unlock:

 if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end 

extern:

if redis.call("GET", KEYS[1]) == ARGV[1] then
  return redis.call("SET", KEYS[1], ARGV[1], "XX", "PX", ARGV[2])
else
  return 0
end

版本三就是一个可以在通常情况下使用的分布式锁了。但是还有点小问题,如果redis crash了咋办?这里我们只使用了一个redis。而redis集群是异步同步,有可能丢失数据,也有可能存在时间差,在这种情况下都有可能被其他client获取到lock。当然redis官方提供了一个redlock的算法,利用多台redis master, 每次lock,unlock,extern都需要大多是redis节点操作成功才算成功,复杂度也提升了很多,可以参考这里的实现:https://github.com/go-redsync/redsync。 还有要注意的地方就是最好操作实现幂等性,防止锁出现未知问题。

3 consul 实现

对于consul来说,需要关注的点是一样的。只不过在consul有不同的抽象。consul中使用kv跟session来实现。session用来标识是哪个client获取了lock。consul比较好的一点是它是强一致的集群。consul服务的稳定性有保证。 参考官方文档: https://www.consul.io/docs/guides/leader-election.html 在官方的sdk中已经实现了一个可以直接使用的分布式锁,操作如下: lock:

func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
  // Hold the lock as we try to acquire
  l.l.Lock()
  defer l.l.Unlock()

  // Check if we already hold the lock
  if l.isHeld {
    return nil, ErrLockHeld
  }

  // Check if we need to create a session first
  // 如果没有session,创建一个,用于标识获取锁的对象和过期管理
  l.lockSession = l.opts.Session
  if l.lockSession == "" {
    s, err := l.createSession()
    if err != nil {
      return nil, fmt.Errorf("failed to create session: %v", err)
    }

    l.sessionRenew = make(chan struct{})
    l.lockSession = s
    session := l.c.Session()
    // 在没释放锁之前一直刷新session的时间
    go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)

    // If we fail to acquire the lock, cleanup the session
    defer func() {
      if !l.isHeld {
    close(l.sessionRenew)
    l.sessionRenew = nil
      }
    }()
  }

  // Setup the query options
  kv := l.c.KV()
  qOpts := &QueryOptions{
    WaitTime: l.opts.LockWaitTime,
  }

  start := time.Now()
  attempts := 0
WAIT:
  // Check if we should quit
  select {
  case <-stopCh:
    return nil, nil
  default:
  }

  // Handle the one-shot mode.
  if l.opts.LockTryOnce && attempts > 0 {
    elapsed := time.Since(start)
    if elapsed > qOpts.WaitTime {
      return nil, nil
    }

    qOpts.WaitTime -= elapsed
  }
  attempts++

  // Look for an existing lock, blocking until not taken
  pair, meta, err := kv.Get(l.opts.Key, qOpts)
  if err != nil {
    return nil, fmt.Errorf("failed to read lock: %v", err)
  }
  if pair != nil && pair.Flags != LockFlagValue {
    return nil, ErrLockConflict
  }
  locked := false
  if pair != nil && pair.Session == l.lockSession {
    goto HELD
  }
  if pair != nil && pair.Session != "" {
    qOpts.WaitIndex = meta.LastIndex
    goto WAIT
  }

  //获取一个lock
  pair = l.lockEntry(l.lockSession)
  locked, _, err = kv.Acquire(pair, nil)
  if err != nil {
    return nil, fmt.Errorf("failed to acquire lock: %v", err)
  }

  // Handle the case of not getting the lock
  if !locked {
    // Determine why the lock failed
    qOpts.WaitIndex = 0
    pair, meta, err = kv.Get(l.opts.Key, qOpts)
    if pair != nil && pair.Session != "" {
      //If the session is not null, this means that a wait can safely happen
      //using a long poll
      qOpts.WaitIndex = meta.LastIndex
      goto WAIT
    } else {
      // If the session is empty and the lock failed to acquire, then it means
      // a lock-delay is in effect and a timed wait must be used
      select {
      case <-time.After(DefaultLockRetryTime):
    goto WAIT
      case <-stopCh:
    return nil, nil
      }
    }
  }

HELD:
  // Watch to ensure we maintain leadership
  leaderCh := make(chan struct{})
  //一直监视锁的占用者,当锁的占用者不是当前session,close掉leaderCh
  go l.monitorLock(l.lockSession, leaderCh)

  // Set that we own the lock
  l.isHeld = true

  // Locked! All done
  return leaderCh, nil
}

unlock:

//unlock操作比较简单
func (l *Lock) Unlock() error {
  // Hold the lock as we try to release
  l.l.Lock()
  defer l.l.Unlock()

  // Ensure the lock is actually held
  if !l.isHeld {
    return ErrLockNotHeld
  }

  // Set that we no longer own the lock
  l.isHeld = false

  // Stop the session renew
  // 停止延长session的时间
  if l.sessionRenew != nil {
    defer func() {
      close(l.sessionRenew)
      l.sessionRenew = nil
    }()
  }

  // Get the lock entry, and clear the lock session
  lockEnt := l.lockEntry(l.lockSession)
  l.lockSession = ""

  // Release the lock explicitly
  //直接调用release释放掉lock
  kv := l.c.KV()
  _, _, err := kv.Release(lockEnt, nil)
  if err != nil {
    return fmt.Errorf("failed to release lock: %v", err)
  }
  return nil
}

从上可以看到consul的实现还是比较严谨的。比如不停的监视锁的占用者。比如在锁释放后才停止延长session的周期(这里要注意的是就算锁释放失败了也不代表你占用了锁,因为已经停止了session的刷新, 锁会被过期自动释放掉,所以最好是在调用之后不再使用互斥操作, 但是可以多次调用unlock)

4 参考资料