groupcache

最近无业游民就像看看现在招聘Go的需求看到有些公司有对 groupcache 要求,于是兴趣来就看啦下

当前项目 Star 7w+

项目介绍

groupcache存储的是kv结构,同是memcache作者出品,官方github上说明如下:

groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.

groupcache是一个缓存和缓存库,在许多情况下用作memcached的替代品。

这个框架的适用场景并不多,因为groupcache只能get,不能update和delete,也不能设置过期时间,只能通过lru淘汰最近最少访问的数据;

groupcache还是有它的优点的,groupcache既是服务器,也是客户端,当在本地groupcache缓存中没有查找的数据时,通过一致性哈希,查找到该key所对应的peer服务器,在通过http协议,从该peer服务器上获取所需要的数据;还有一点就是当多个客户端同时访问memcache中不存在的键时,会导致多个客户端从mysql获取数据并同时插入memcache中,而在相同情况下,groupcache只会有一个客户端从mysql获取数据,其他客户端阻塞,直到第一个客户端获取到数据之后,再返回给多个客户端;

groupcache是一个缓存库,也就是说不是一个完整的软件,需要自己实现main函数。

代码

package consistenthash

提供环形哈希的实现。

type Hash func(data []byte) uint32

type Map struct {
	hash     Hash
	replicas int
	keys     []int // Sorted
	hashMap  map[int]string
}

// 使用案例
// Override the hash function to return easier to reason about values. Assumes
// the keys can be converted to an integer.
hash := New(3, func(key []byte) uint32 {
	i, err := strconv.Atoi(string(key))
	if err != nil {
		panic(err)
	}
	return uint32(i)
})
// 具体库调用位置 http.go 109 和 121
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)

Add 向哈希添加一些键。

// Adds some keys to the hash.
// 向哈希添加一些键。
func (m *Map) Add(keys ...string) {
	for _, key := range keys {
		for i := 0; i < m.replicas; i++ {
			hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
			m.keys = append(m.keys, hash)
			m.hashMap[hash] = key
		}
    }
    // 重新排序
	sort.Ints(m.keys)
}

Get 获取散列中与提供的键最接近的项。

// Gets the closest item in the hash to the provided key.
// 获取散列中与提供的键最接近的项。
func (m *Map) Get(key string) string {
	if m.IsEmpty() {
		return ""
	}
	hash := int(m.hash([]byte(key)))
	// Binary search for appropriate replica.
	idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
	// Means we have cycled back to the first replica.
	if idx == len(m.keys) {
		idx = 0
	}
	return m.hashMap[m.keys[idx]]
}

package lru

LRU 算法实现,基于 container/list

结构体

// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
	// MaxEntries is the maximum number of cache entries before
	// an item is evicted. Zero means no limit.
	MaxEntries int

	// OnEvicted optionally specifies a callback function to be
	// executed when an entry is purged from the cache.
	OnEvicted func(key Key, value interface{})

	ll    *list.List
	cache map[interface{}]*list.Element
}

// A Key may be any value that is comparable.
type Key interface{}
  • Add 添加缓存,如果存在更换位置
  • Get 获取一个缓存,并且,并且更新位置
  • Remove 移出制定缓存
  • RemoveOldest 删除最旧的缓存
  • Clear 清空缓存
  • Len 获取长度

package singleflight

提供重复的函数调用抑制

主要结构体

// call is an in-flight or completed Do call
type call struct {
    wg  sync.WaitGroup
    val interface{}
    err error
}
// 控制单位结构
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

具体控制实现方法

// 执行当前Group的单元控制
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error)

调用使用位置:

// 只有这一个地方调用
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
	g.Stats.Loads.Add(1)
    viewi, err := g.loadGroup.Do(key, func() (interface{}, error)
    ...
}

// 上层调用方法为
func (g *Group) Get(ctx Context, key string, dest Sink) error 

package groupcachepb

grpc 接口声明

核心 groupcache.go

提供了一个带缓存的数据加载机制和重复数据删除适用于一组对等进程。

声明方法

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
	return newGroup(name, cacheBytes, getter, nil)
}

调用案例

var stringcache = groupcache.NewGroup("SlowDBCache", 64<<20, groupcache.GetterFunc(
    func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
        result := client.Get(key) // 客户端调用,参考下面案例链接地址
        fmt.Printf("asking for %s from dbserver\n", key)
        dest.SetBytes([]byte(result))
        return nil
}))
// 获取缓存方法
func (g *Group) Get(ctx Context, key string, dest Sink) error

案例参考

Playing with groupcache

grpc 接口调用

参考资料