groupcache源码阅读
Table of Contents
1 前言
groupcache是golang写的一个缓存库,因为是内存缓存,所以效率比较高。目前公司会用到,而且源码比较简单,所以饿着肚子通读了它的源码。 groupcache大概可以分为三层结构(自己分的,大雾),每层提供基本的功能,下层为上层提供服务。 第一层是lru包下的cache结构,这层提供了基本的cache功能,提供最近最少使用淘汰缓存数据。 第二层是groupcache.go文件中的cache结构,它是对lru.cache的分装,并提供安全并发功能,合一些基本的统计数据。 第三层是groupcache.go中的group结构, 它提供了简单的分布式缓存支持,flight缓存调用(对于并发的多次读取,只请求一次get,每个并发返回相同的数据)。
2 lru.cache结构
lru下的cache结构,数据存放在一个双向链表中,并提供一个map映射到key跟列表的元素,链表主要提供lru算法。map主要提供快速查找key。如下图所示, list是标准库中的container/list
// Cache is an LRU cache. It is not safe for concurrent access. type Cache struct { // MaxEntries is the maximum number of cache entries before // an item is evicted. Zero means no limit. MaxEntries int // OnEvicted optionally specificies a callback function to be // executed when an entry is purged from the cache. OnEvicted func(key Key, value interface{}) ll *list.List cache map[interface{}]*list.Element } // Add adds a value to the cache. func (c *Cache) Add(key Key, value interface{}) { if c.cache == nil { c.cache = make(map[interface{}]*list.Element) c.ll = list.New() } if ee, ok := c.cache[key]; ok { //如果拿到key的数据,是热点数据,放在链表最前面,并更新值 c.ll.MoveToFront(ee) ee.Value.(*entry).value = value return } //没有则存放在list中,并对map建立索引 ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { c.RemoveOldest() } } // Get looks up a key's value from the cache. func (c *Cache) Get(key Key) (value interface{}, ok bool) { if c.cache == nil { return } if ele, hit := c.cache[key]; hit { //如果能拿到,则是热点数据,移动到最前面。 c.ll.MoveToFront(ele) return ele.Value.(*entry).value, true } return }
其他的一些操作都非常简单,不一一列出.
3 cache结构
groupcache中的cache主要是加了并发安全,并添加一些统计数据, 一些操作都是直接调用lru.cache
type cache struct { mu sync.RWMutex nbytes int64 // of all keys and values lru *lru.Cache nhit, nget int64 nevict int64 // number of evictions } func (c *cache) add(key string, value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = &lru.Cache{ OnEvicted: func(key lru.Key, value interface{}) { val := value.(ByteView) c.nbytes -= int64(len(key.(string))) + int64(val.Len()) c.nevict++ }, } } c.lru.Add(key, value) c.nbytes += int64(len(key)) + int64(value.Len()) } func (c *cache) get(key string) (value ByteView, ok bool) { c.mu.Lock() defer c.mu.Unlock() c.nget++ if c.lru == nil { return } vi, ok := c.lru.Get(key) if !ok { return } c.nhit++ return vi.(ByteView), true }
4 group结构
group结构如下:
type Group struct { //group的名字,必须唯一 name string //getter方法,用于从缓存失效后从数据库或其他地方获取数据. getter Getter //分布式支持 peersOnce sync.Once peers PeerPicker cacheBytes int64 // limit for sum of mainCache and hotCache size //两个缓存 // mainCache is a cache of the keys for which this process // (amongst its peers) is authoritative. That is, this cache // contains keys which consistent hash on to this process's // peer number. mainCache cache // hotCache contains keys/values for which this peer is not // authoritative (otherwise they would be in mainCache), but // are popular enough to warrant mirroring in this process to // avoid going over the network to fetch from a peer. Having // a hotCache avoids network hotspotting, where a peer's // network card could become the bottleneck on a popular key. // This cache is used sparingly to maximize the total number // of key/value pairs that can be stored globally. hotCache cache // loadGroup ensures that each key is only fetched once // (either locally or remotely), regardless of the number of // concurrent callers. // 在缓存命中失败的时候减少调用 loadGroup flightGroup _ int32 // force Stats to be 8-byte aligned on 32-bit platforms // Stats are statistics on the group. Stats Stats }
先说flightGroup结构比较简单,这个结构主要是在缓存失效的时候,减少对底层的访问。比如一个缓存数据失效了,这个时候同时会有很多人调用接口,缓存都没有命中,就会对数据库发起很多次调用,其实这个时候只要调用一次就行了,其他的都是相同的数据。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } //对于同一个key的调用等待其他goroutine调用完成 if c, ok := g.m[key]; ok { g.mu.Unlock() c.wg.Wait() //直接返回数据 return c.val, c.err } //同个key的调用放在map中 c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() //获取数据 c.val, c.err = fn() c.wg.Done() //删除限制 g.mu.Lock() delete(g.m, key) g.mu.Unlock() return c.val, c.err }
来看看group的get方法是怎么使用的
func (g *Group) Get(ctx Context, key string, dest Sink) error { g.peersOnce.Do(g.initPeers) g.Stats.Gets.Add(1) if dest == nil { return errors.New("groupcache: nil dest Sink") } //在缓存中查看key value, cacheHit := g.lookupCache(key) if cacheHit { g.Stats.CacheHits.Add(1) return setSinkView(dest, value) } // Optimization to avoid double unmarshalling or copying: keep // track of whether the dest was already populated. One caller // (if local) will set this; the losers will not. The common // case will likely be one caller. destPopulated := false //如果没有在缓存中找到数据,就从getter方法中load进来 value, destPopulated, err := g.load(ctx, key, dest) if err != nil { return err } if destPopulated { return nil } return setSinkView(dest, value) } func (g *Group) lookupCache(key string) (value ByteView, ok bool) { //这个方法比较简单,从是从maincache和hotcache中读取数据 if g.cacheBytes <= 0 { return } value, ok = g.mainCache.get(key) if ok { return } value, ok = g.hotCache.get(key) return } func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) //loadGroup减少对底层的调用,上面已经说了 viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { if value, cacheHit := g.lookupCache(key); cacheHit { g.Stats.CacheHits.Add(1) return value, nil } g.Stats.LoadsDeduped.Add(1) var value ByteView var err error //如果能从远程获取,就从分布式的其他机子获取就从其他机子获取,因为其他机器也是缓存数据比数据库快 if peer, ok := g.peers.PickPeer(key); ok { value, err = g.getFromPeer(ctx, peer, key) if err == nil { g.Stats.PeerLoads.Add(1) return value, nil } g.Stats.PeerErrors.Add(1) // TODO(bradfitz): log the peer's error? keep // log of the past few for /groupcachez? It's // probably boring (normal task movement), so not // worth logging I imagine. } //调用getter方法,获取数据(从数据库,或者其他地方) value, err = g.getLocally(ctx, key, dest) if err != nil { g.Stats.LocalLoadErrs.Add(1) return nil, err } g.Stats.LocalLoads.Add(1) destPopulated = true // only one caller of load gets this return value //把数据存放在cache中 g.populateCache(key, value, &g.mainCache) return value, nil }) if err == nil { value = viewi.(ByteView) } return }
5 分布式结构
上面提到了从分布式机子获取缓存使用的是getFromPeer方法,下面是源码
//每一个分布式的服务都需要实现一个Get方法,接口描述文件在proto文件中 func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) { req := &pb.GetRequest{ Group: &g.name, Key: &key, } res := &pb.GetResponse{} //从远端得到数据 err := peer.Get(ctx, req, res) if err != nil { return ByteView{}, err } value := ByteView{b: res.Value} // TODO(bradfitz): use res.MinuteQps or something smart to // conditionally populate hotCache. For now just do it some // percentage of the time. //哈哈,这里随机放在hotCache中,有意思 if rand.Intn(10) == 0 { g.populateCache(key, value, &g.hotCache) } return value, nil }
然后groupcache实现了一个Get接口,源码在http.go中, 有兴趣的可以看看。这里就不加了。
6 总结
所以现在可以看到一次groupcache的读取过程如下:
- 从mainCache中读取
- 从hotCache中读取
- 从分布式集群中的其他机器读取
- 从底层(数据库或其他地方)读取
不过groupcache只支持get方法,你不能自己控制cache过期。这点不大方便,因为我们总希望实时获取数据。