FreeCache

FreeCache 是一个 Go 语言的缓存库,无额外的 GC 负荷。数百万对象的垃圾收集延迟仅在数百毫秒。您可以在内存中缓存无限数量的对象,而不会增加延迟和降低吞吐量。

特征

  • 可存储数以百万计条目
  • 零垃圾收集负荷
  • 高并发而且线程安全的访问
  • 纯 Go 语言实现
  • 支持对象失效
  • 近乎 LRU 的算法
  • 严格限制内存使用
  • 提供一个测试用的服务器,支持一些基本 Redis 命令

案例

cacheSize := 1024*1024
cache := freecache.NewCache(cacheSize)
key := []byte("abc")
val := []byte("def")
expire := 60 // expire in 60 seconds
cache.Set(key, val, expire)
got, err := cache.Get(key)
if err != nil {
    fmt.Println(err)
} else {
    fmt.Println(string(got))
}
affected := cache.Del(key)
fmt.Println("deleted key ", affected)
fmt.Println("entry count ", cache.EntryCount())

注意事项

  • 内存是预先分配的
  • 如果你分配的内存非常大,那么应该设置 debug.SetGCPercent() 到一个很小的比例来获得正常的 GC 频率

如何做到的

FreeCache通过减少指针数来避免GC开销。 无论存储多少个条目,只有512个指针。 数据集通过密钥的哈希值分片为256个段。 每个段只有两个指针,一个是存储键和值的环形缓冲区,另一个是用于查找条目的索引片。 每个段都有自己的锁,因此它支持高并发访问。

案例解释

// 设置开始内存大小
cacheSize := 1024*1024
// NewCache按大小返回一个新初始化缓存。
// 缓存大小至少设置为512KB。
// 如果大小设置得比较大,你应该打电话
// `debug.SetGCPercent()`,将其设置为更小的值
// 限制内存消耗和GC暂停时间。
cache := freecache.NewCache(cacheSize)

key := []byte("abc")
val := []byte("def")
expire := 60 // expire in 60 seconds

// Set设置缓存条目的key,值和过期,并将其存储在缓存中。
// 如果密钥大于65535或者值大于缓存大小的1/1024,
// 该条目不会写入缓存。 expireSeconds <= 0表示没有过期,
// 但是当缓存已满时可以将其逐出。
cache.Set(key, val, expire)
// 获取返回值或未找到错误。
got, err := cache.Get(key)
if err != nil {
    fmt.Println(err)
} else {
    fmt.Println(string(got))
}

affected := cache.Del(key)
fmt.Println("deleted key ", affected)
// EntryCount返回当前缓存中的项目数。
fmt.Println("entry count ", cache.EntryCount())

源码分析

依赖说明

本项目主要依赖项目:xxhash 具体使用参考 xxhash 源码分析

文件结构

cache.go
cache_test.go
segment.go
ringbuf.go
ringbuf_test.go
iterator.go

具体代码

从 cache.go 开始,主要是对外方法

声明的常量

const (
    // segmentCount表示freecache实例中的段数
    segmentCount    = 256
    // segmentAndOpVal按位并应用于hashVal以查找段ID
    segmentAndOpVal = 255
    // 最小声明内存大小
    minBufSize      = 512 * 1024
)

Cache 结构体

type Cache struct {
	locks    [segmentCount]sync.Mutex
	segments [segmentCount]segment
}

// segment.go 文件中 segment 结构体
type segment struct 

xxhash Sum64 计算

func hashFunc(data []byte) uint64 {
	return xxhash.Sum64(data)
}

初始化一个内存分配

func NewCache(size int) (cache *Cache) {
	if size < minBufSize { // 判断初始化空间是不是小于最小化设置
		size = minBufSize
	}
	cache = new(Cache)
	for i := 0; i < segmentCount; i++ { // 循环分配内存变量
		cache.segments[i] = newSegment(size/segmentCount, i)
	}
	return
}

设置一个键值,里面还有其他的设置方法例如:

  • func (cache *Cache) SetInt(key int64, value []byte, expireSeconds int) (err error)
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
	hashVal := hashFunc(key) // 调用key 的 hash 生成
	segID := hashVal & segmentAndOpVal
	cache.locks[segID].Lock()
	err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
	cache.locks[segID].Unlock()
	return
}

获取一个键值,里面还有其他的方法例如:

  • func (cache *Cache) GetInt(key int64) (value []byte, err error)
func (cache *Cache) Get(key []byte) (value []byte, err error) {
	hashVal := hashFunc(key)
	segID := hashVal & segmentAndOpVal
	cache.locks[segID].Lock()
	value, _, err = cache.segments[segID].get(key, hashVal)
	cache.locks[segID].Unlock()
	return
}

其他可能会用到的方法

  • func (cache *Cache) HitCount() (count int64) HitCount是一个指标值,用于返回在缓存中找到密钥的次数
  • func (cache *Cache) MissCount() (count int64) MissCount是一个指标值,用于返回缓存中未命中的次数
  • func (cache *Cache) LookupCount() int64 LookupCount是一个度量值,它返回给定键的查找发生的次数
  • func (cache *Cache) HitRate() float64 HitRate是命中率与查找率之比
  • func (cache *Cache) OverwriteCount() (overwriteCount int64)

有啦上面的一些方法我们就可以比较容易的基于一些监控提交做监控报表,例如:写一个 Prometheus exporter 就可以做监控啦

下面看看核心存储如何做的,来看 segment.go

const HASH_ENTRY_SIZE = 16 	// 没有看到哪里有用到?
const ENTRY_HDR_SIZE = 24	// byte 什么默认大小

// 环形缓冲区中的条目头结构,后跟键和值。
type entryHdr struct {
	accessTime uint32
	expireAt   uint32
	keyLen     uint16
	hash16     uint16
	valLen     uint32
	valCap     uint32
	deleted    bool
	slotId     uint8
	reserved   uint16
}

segment 结构体说明

// 一个段包含256个槽,一个槽是由hash16值排序的入口指针数组
// 可以通过密钥的哈希值查找条目。
type segment struct {
	rb            RingBuf // ring buffer that stores data
	segId         int
	_             uint32
	missCount     int64
	hitCount      int64
	entryCount    int64
	totalCount    int64      // 环缓冲区中的条目数,包括已删除
	totalTime     int64      // 用于计算最近最少使用的条目.
	totalEvacuate int64      // used for debug
	totalExpired  int64      // used for debug
	overwrites    int64      // used for debug
	vacuumLen     int64      // 直到 vacuumLen,可以写入新数据而不会覆盖旧数据
	slotLens      [256]int32 // 每个插槽的实际长度
	slotCap       int32      // 插槽可以容纳的最大入口指针数
	slotsData     []entryPtr // 所有256个插槽
}

newSegment 初始化声明内存空间

func newSegment(bufSize int, segId int) (seg segment) {
	seg.rb = NewRingBuf(bufSize, 0)
	seg.segId = segId
	seg.vacuumLen = int64(bufSize)
	seg.slotCap = 1
	seg.slotsData = make([]entryPtr, 256*seg.slotCap)
	return
}

顺着这个进入 NewRingBuf

// 当数据超过时,环形缓冲区具有固定大小
// 大小,旧数据将被新数据覆盖。
// 它只包含从开始到结束的流中的数据
type RingBuf struct {
	begin int64 // beginning offset of the data stream.
	end   int64 // ending offset of the data stream.
	data  []byte
	index int //range from '0' to 'len(rb.data)-1'
}
// 声明 RingBuf size 为之前传递的大小(size/segmentCount)或最小值
func NewRingBuf(size int, begin int64) (rb RingBuf) {
	rb.data = make([]byte, size) // 声明一个空值
	rb.begin = begin
	rb.end = begin
	rb.index = 0
	return
}

声明完成后顺着这个顺序进入 segment:Set 和 Get 方法

func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
	if len(key) > 65535 { // 对大 key 限制
		return ErrLargeKey
	}
	maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE
	if len(key)+len(value) > maxKeyValLen { // 限制值得大小
		// Do not accept large entry.
		return ErrLargeEntry
	}
	now := uint32(time.Now().Unix()) // 获取当前时间戳
	expireAt := uint32(0)
	if expireSeconds > 0 {
		expireAt = now + uint32(expireSeconds)
	}

	slotId := uint8(hashVal >> 8)
	hash16 := uint16(hashVal >> 16)

	var hdrBuf [ENTRY_HDR_SIZE]byte // 声明默认大小
	hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))

	slotOff := int32(slotId) * seg.slotCap
	slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] // 获取槽的位置
	idx, match := seg.lookup(slot, hash16, key)
	if match {
		matchedPtr := &slot[idx]
		seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
		hdr.slotId = slotId
		hdr.hash16 = hash16
		hdr.keyLen = uint16(len(key))
		originAccessTime := hdr.accessTime
		hdr.accessTime = now
		hdr.expireAt = expireAt
		hdr.valLen = uint32(len(value))
		if hdr.valCap >= hdr.valLen {
			//in place overwrite
			atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
			seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
			seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
			atomic.AddInt64(&seg.overwrites, 1)
			return
		}
		// avoid unnecessary memory copy.
		seg.delEntryPtr(slotId, hash16, slot[idx].offset)
		//seg.delEntryPtr(slotId, hash16, seg.slotsData[idx].offset)
		match = false
		// increase capacity and limit entry len.
		for hdr.valCap < hdr.valLen {
			hdr.valCap *= 2
		}
		if hdr.valCap > uint32(maxKeyValLen-len(key)) {
			hdr.valCap = uint32(maxKeyValLen - len(key))
		}
	} else {
		hdr.slotId = slotId
		hdr.hash16 = hash16
		hdr.keyLen = uint16(len(key))
		hdr.accessTime = now
		hdr.expireAt = expireAt
		hdr.valLen = uint32(len(value))
		hdr.valCap = uint32(len(value))
		if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
			hdr.valCap = 1
		}
	}

	entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
	slotModified := seg.evacuate(entryLen, slotId, now)
	if slotModified {
		// the slot has been modified during evacuation, we need to looked up for the 'idx' again.
		// otherwise there would be index out of bound error.
		slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
		idx, match = seg.lookup(slot, hash16, key)
		// assert(match == false)
	}
	newOff := seg.rb.End()
	seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
	seg.rb.Write(hdrBuf[:])
	seg.rb.Write(key)
	seg.rb.Write(value)
	seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
	atomic.AddInt64(&seg.totalTime, int64(now))
	atomic.AddInt64(&seg.totalCount, 1)
	seg.vacuumLen -= entryLen
	return
}

func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint32, err error) {
	slotId := uint8(hashVal >> 8)
	hash16 := uint16(hashVal >> 16)
	slotOff := int32(slotId) * seg.slotCap
	var slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
	idx, match := seg.lookup(slot, hash16, key)
	if !match {
		err = ErrNotFound
		atomic.AddInt64(&seg.missCount, 1)
		return
	}
	ptr := &slot[idx]
	now := uint32(time.Now().Unix())

	var hdrBuf [ENTRY_HDR_SIZE]byte
	seg.rb.ReadAt(hdrBuf[:], ptr.offset)
	hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
	expireAt = hdr.expireAt

	if hdr.expireAt != 0 && hdr.expireAt <= now {
		seg.delEntryPtr(slotId, hash16, ptr.offset)
		atomic.AddInt64(&seg.totalExpired, 1)
		err = ErrNotFound
		atomic.AddInt64(&seg.missCount, 1)
		return
	}
	atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime))
	hdr.accessTime = now
	seg.rb.WriteAt(hdrBuf[:], ptr.offset)
	value = make([]byte, hdr.valLen)

	seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
	atomic.AddInt64(&seg.hitCount, 1)
	return
}

func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
	slotId := uint8(hashVal >> 8)
	hash16 := uint16(hashVal >> 16)
	slotOff := int32(slotId) * seg.slotCap
	slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
	idx, match := seg.lookup(slot, hash16, key)
	if !match {
		return false
	}
	ptr := &slot[idx]
	seg.delEntryPtr(slotId, hash16, ptr.offset)
	return true
}

总结

流程总结

申请内存空间->初始化槽位和值->设置键值->获取键值

代码总结

项目核心文件有 4 个:

  • cache.go 主要方法
  • iterator.go 迭代
  • segment.go 存储操作
  • ringbuf.go 环形存储