UP | HOME

groupcache源码阅读

Table of Contents

1 前言

groupcache是golang写的一个缓存库,因为是内存缓存,所以效率比较高。目前公司会用到,而且源码比较简单,所以饿着肚子通读了它的源码。 groupcache大概可以分为三层结构(自己分的,大雾),每层提供基本的功能,下层为上层提供服务。 第一层是lru包下的cache结构,这层提供了基本的cache功能,提供最近最少使用淘汰缓存数据。 第二层是groupcache.go文件中的cache结构,它是对lru.cache的分装,并提供安全并发功能,合一些基本的统计数据。 第三层是groupcache.go中的group结构, 它提供了简单的分布式缓存支持,flight缓存调用(对于并发的多次读取,只请求一次get,每个并发返回相同的数据)。

2 lru.cache结构

lru下的cache结构,数据存放在一个双向链表中,并提供一个map映射到key跟列表的元素,链表主要提供lru算法。map主要提供快速查找key。如下图所示, list是标准库中的container/list

// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
  // MaxEntries is the maximum number of cache entries before
  // an item is evicted. Zero means no limit.
  MaxEntries int

  // OnEvicted optionally specificies a callback function to be
  // executed when an entry is purged from the cache.
  OnEvicted func(key Key, value interface{})

  ll    *list.List
  cache map[interface{}]*list.Element
}


// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
  if c.cache == nil {
    c.cache = make(map[interface{}]*list.Element)
    c.ll = list.New()
  }
  if ee, ok := c.cache[key]; ok {
    //如果拿到key的数据,是热点数据,放在链表最前面,并更新值
    c.ll.MoveToFront(ee)
    ee.Value.(*entry).value = value
    return
  }
  //没有则存放在list中,并对map建立索引
  ele := c.ll.PushFront(&entry{key, value})
  c.cache[key] = ele
  if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
    c.RemoveOldest()
  }
}

// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
  if c.cache == nil {
    return
  }
  if ele, hit := c.cache[key]; hit {
    //如果能拿到,则是热点数据,移动到最前面。
    c.ll.MoveToFront(ele)
    return ele.Value.(*entry).value, true
  }
  return
}

其他的一些操作都非常简单,不一一列出.

3 cache结构

groupcache中的cache主要是加了并发安全,并添加一些统计数据, 一些操作都是直接调用lru.cache

type cache struct {
  mu         sync.RWMutex
  nbytes     int64 // of all keys and values
  lru        *lru.Cache
  nhit, nget int64
  nevict     int64 // number of evictions
}

func (c *cache) add(key string, value ByteView) {
  c.mu.Lock()
  defer c.mu.Unlock()
  if c.lru == nil {
    c.lru = &lru.Cache{
      OnEvicted: func(key lru.Key, value interface{}) {
    val := value.(ByteView)
    c.nbytes -= int64(len(key.(string))) + int64(val.Len())
    c.nevict++
      },
    }
  }
  c.lru.Add(key, value)
  c.nbytes += int64(len(key)) + int64(value.Len())
}

func (c *cache) get(key string) (value ByteView, ok bool) {
  c.mu.Lock()
  defer c.mu.Unlock()
  c.nget++
  if c.lru == nil {
    return
  }
  vi, ok := c.lru.Get(key)
  if !ok {
    return
  }
  c.nhit++
  return vi.(ByteView), true
}

4 group结构

group结构如下:

type Group struct {
  //group的名字,必须唯一
  name       string
  //getter方法,用于从缓存失效后从数据库或其他地方获取数据.
  getter     Getter
  //分布式支持
  peersOnce  sync.Once
  peers      PeerPicker
  cacheBytes int64 // limit for sum of mainCache and hotCache size

  //两个缓存
  // mainCache is a cache of the keys for which this process
  // (amongst its peers) is authoritative. That is, this cache
  // contains keys which consistent hash on to this process's
  // peer number.
  mainCache cache

  // hotCache contains keys/values for which this peer is not
  // authoritative (otherwise they would be in mainCache), but
  // are popular enough to warrant mirroring in this process to
  // avoid going over the network to fetch from a peer.  Having
  // a hotCache avoids network hotspotting, where a peer's
  // network card could become the bottleneck on a popular key.
  // This cache is used sparingly to maximize the total number
  // of key/value pairs that can be stored globally.
  hotCache cache

  // loadGroup ensures that each key is only fetched once
  // (either locally or remotely), regardless of the number of
  // concurrent callers.
  // 在缓存命中失败的时候减少调用
  loadGroup flightGroup

  _ int32 // force Stats to be 8-byte aligned on 32-bit platforms

  // Stats are statistics on the group.
  Stats Stats
}

先说flightGroup结构比较简单,这个结构主要是在缓存失效的时候,减少对底层的访问。比如一个缓存数据失效了,这个时候同时会有很多人调用接口,缓存都没有命中,就会对数据库发起很多次调用,其实这个时候只要调用一次就行了,其他的都是相同的数据。

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  g.mu.Lock()
  if g.m == nil {
    g.m = make(map[string]*call)
  }
  //对于同一个key的调用等待其他goroutine调用完成
  if c, ok := g.m[key]; ok {
    g.mu.Unlock()
    c.wg.Wait()
    //直接返回数据
    return c.val, c.err
  }
  //同个key的调用放在map中
  c := new(call)
  c.wg.Add(1)
  g.m[key] = c
  g.mu.Unlock()

  //获取数据
  c.val, c.err = fn()
  c.wg.Done()

  //删除限制
  g.mu.Lock()
  delete(g.m, key)
  g.mu.Unlock()

  return c.val, c.err
}

来看看group的get方法是怎么使用的

func (g *Group) Get(ctx Context, key string, dest Sink) error {
  g.peersOnce.Do(g.initPeers)
  g.Stats.Gets.Add(1)
  if dest == nil {
    return errors.New("groupcache: nil dest Sink")
  }
  //在缓存中查看key
  value, cacheHit := g.lookupCache(key)
  if cacheHit {
    g.Stats.CacheHits.Add(1)
    return setSinkView(dest, value)
  }

  // Optimization to avoid double unmarshalling or copying: keep
  // track of whether the dest was already populated. One caller
  // (if local) will set this; the losers will not. The common
  // case will likely be one caller.
  destPopulated := false
  //如果没有在缓存中找到数据,就从getter方法中load进来
  value, destPopulated, err := g.load(ctx, key, dest)
  if err != nil {
    return err
  }
  if destPopulated {
    return nil
  }
  return setSinkView(dest, value)
}

func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
  //这个方法比较简单,从是从maincache和hotcache中读取数据
  if g.cacheBytes <= 0 {
    return
  }
  value, ok = g.mainCache.get(key)
  if ok {
    return
  }
  value, ok = g.hotCache.get(key)
  return
}
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
  g.Stats.Loads.Add(1)
  //loadGroup减少对底层的调用,上面已经说了
  viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
    if value, cacheHit := g.lookupCache(key); cacheHit {
      g.Stats.CacheHits.Add(1)
      return value, nil
    }
    g.Stats.LoadsDeduped.Add(1)
    var value ByteView
    var err error
    //如果能从远程获取,就从分布式的其他机子获取就从其他机子获取,因为其他机器也是缓存数据比数据库快
    if peer, ok := g.peers.PickPeer(key); ok {
      value, err = g.getFromPeer(ctx, peer, key)
      if err == nil {
    g.Stats.PeerLoads.Add(1)
    return value, nil
      }
      g.Stats.PeerErrors.Add(1)
      // TODO(bradfitz): log the peer's error? keep
      // log of the past few for /groupcachez?  It's
      // probably boring (normal task movement), so not
      // worth logging I imagine.
    }
    //调用getter方法,获取数据(从数据库,或者其他地方)
    value, err = g.getLocally(ctx, key, dest)
    if err != nil {
      g.Stats.LocalLoadErrs.Add(1)
      return nil, err
    }
    g.Stats.LocalLoads.Add(1)
    destPopulated = true // only one caller of load gets this return value
    //把数据存放在cache中
    g.populateCache(key, value, &g.mainCache)
    return value, nil
  })
  if err == nil {
    value = viewi.(ByteView)
  }
  return
}

5 分布式结构

上面提到了从分布式机子获取缓存使用的是getFromPeer方法,下面是源码

//每一个分布式的服务都需要实现一个Get方法,接口描述文件在proto文件中
func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
  req := &pb.GetRequest{
    Group: &g.name,
    Key:   &key,
  }
  res := &pb.GetResponse{}
  //从远端得到数据
  err := peer.Get(ctx, req, res)
  if err != nil {
    return ByteView{}, err
  }
  value := ByteView{b: res.Value}
  // TODO(bradfitz): use res.MinuteQps or something smart to
  // conditionally populate hotCache.  For now just do it some
  // percentage of the time.
  //哈哈,这里随机放在hotCache中,有意思
  if rand.Intn(10) == 0 {
    g.populateCache(key, value, &g.hotCache)
  }
  return value, nil
}

然后groupcache实现了一个Get接口,源码在http.go中, 有兴趣的可以看看。这里就不加了。

6 总结

所以现在可以看到一次groupcache的读取过程如下:

  1. 从mainCache中读取
  2. 从hotCache中读取
  3. 从分布式集群中的其他机器读取
  4. 从底层(数据库或其他地方)读取

不过groupcache只支持get方法,你不能自己控制cache过期。这点不大方便,因为我们总希望实时获取数据。