Pebble 源码剖析-写入流程(续)

上一章,分析了 Pebble 写入的整体流程,并深入分析了 Pipeline 的三个执行阶段。但还未对 large batch 和 makeRoomForWrite 具体执行逻辑进行分析,本章将对这两点内容展开讨论。

Large Batch

这里首先思考两个问题:为什么需要区分 large batch,为什么需要对 large batch 特殊处理?笔者最初以比较疑惑,带着疑问向社区提了 issue ,大家可以看下官方开发人员的回答。

回顾下上一章 DB.Apply 方法部分代码:

	if int(batch.memTableSize) >= d.largeBatchThreshold {
		batch.flushable = newFlushableBatch(batch, d.opts.Comparer)
	}

这里会判断 batch 的大小,如果其超过设定的阈值,则将该 batch 当做 large batch 处理。这里会调用 newFlushableBatch 方法,根据 batch 生成一个 flushable(可以理解成 immutable memtable)。下面看一下 newFlushableBatch 具体的逻辑:

该函数代码比较冗长,这里省略部分分支逻辑,保留主要逻辑

func newFlushableBatch(batch *Batch, comparer *Comparer) *flushableBatch {
	b := &flushableBatch{
		data:      batch.data,
		cmp:       comparer.Compare,
		formatKey: comparer.FormatKey,
		offsets:   make([]flushableBatchEntry, 0, batch.Count()),
	}
	
	if len(b.data) > batchHeaderLen {
		// Non-empty batch.
		var index uint32
    // 迭代 batch
		for iter := BatchReader(b.data[batchHeaderLen:]); len(iter) > 0; index++ {
			offset := uintptr(unsafe.Pointer(&iter[0])) - uintptr(unsafe.Pointer(&b.data[0]))
      // 解析当前 batch record
			kind, key, _, ok := iter.Next()
			if !ok {
				break
			}
			entry := flushableBatchEntry{
				offset: uint32(offset),
				index:  uint32(index),
			}
      
			entry.keyStart = uint32(uintptr(unsafe.Pointer(&key[0])) -
				uintptr(unsafe.Pointer(&b.data[0])))
			entry.keyEnd = entry.keyStart + keySize
			b.offsets = append(b.offsets, entry)
		}
	}

	pointOffsets := b.offsets
	sort.Sort(b)
	
	return b
}

首先,初始 flushableBatch **b **,b 中包含了原始 batch 的数据;然后迭代 batch,解析 batch 的每个 record 生成 flushableBatchEntry,entry 主要包含 record 在 batch 中的偏移 offset,record 是 batch 索引号 index(用于计算当前 record 的 seqNum),record 的原始 key 在 batch 中的起始偏移和终止位置。可以看到 entry 主要记录的是 record 相关的位置和偏移信息,根据这些信息可以得到 record 中的 key 和 value。将 entry 加入到 b 的 offsets 中,迭代完毕后,b 便拥有了原始 batch 中所有 record 的位置和偏移信息。

最后对 b 进行排序,排序规则参考如下函数:

func (b *flushableBatch) Less(i, j int) bool {
	ei := &b.offsets[i]
	ej := &b.offsets[j]
	ki := b.data[ei.keyStart:ei.keyEnd]
	kj := b.data[ej.keyStart:ej.keyEnd]
	switch c := b.cmp(ki, kj); {
	case c < 0:
		return true
	case c > 0:
		return false
	default:
		return ei.offset > ej.offset
	}
}

可以看到 flushableBatch 的排序规则:比较 entry 间 key 的大小,比较函数默认为 bytes.Compare,如果 key 相同则偏移位置大的 key 排前面(偏移越大说明 key 是后写入,代表值越新)。

好的,到这里 flushableBatch 便准备完毕了,并会赋值给 batch 的 flushable。再回顾 DB.commitWrite,上一章把方法中 large batch 相关的逻辑省略了,如下:

	repr := b.Repr()
	if b.flushable != nil {
		b.flushable.setSeqNum(b.SeqNum())
		if !d.opts.DisableWAL {
			var err error
			size, err = d.mu.log.SyncRecord(repr, syncWG, syncErr)
			if err != nil {
				panic(err)
			}
		}
	}

接着之前的逻辑,这里 batch 的 flushable 不为空,如果启用了 wal,这些会先写 wal,而后再调用 makeRoomForWrite。这里和普通 batch 的处理顺序相反,后面会做解释。这里先看 makeRoomForWrite,该方法主要的逻辑有如下几点:

  • 当前 memtable 空间足够,直接写入
  • 当前 memtable 空间不够,将当前 memtable 切换为 immutable memtable,然后将当前 memtable 刷盘
  • batch 为空或者为 large batch 则直接切换当前 memtable(注:这两种视为非常规 batch)
  • 如果切换 memtable 则同时会生成新的 log 文件

上述为 makeRoomForWrite 的主要逻辑,还要一些分支逻辑需要结合代码来看看(log 文件相关的代码省略):

func (d *DB) makeRoomForWrite(b *Batch) error {
	force := b == nil || b.flushable != nil
	stalled := false
	for {
		// 检查当前 memtable 是否正在切换中
		if d.mu.mem.switching {
			d.mu.mem.cond.Wait()
			continue
		}
		// batch 为正常小数据量时,进入 prepare
		if b != nil && b.flushable == nil {
			err := d.mu.mem.mutable.prepare(b)
			if err != arenaskl.ErrArenaFull {
				if stalled {
					d.opts.EventListener.WriteStallEnd()
				}
				return err
			}
		}
		// force || err == ErrArenaFull, so we need to rotate the current memtable.
		{
			var size uint64
			// 计算所有 memtable 大小
			for i := range d.mu.mem.queue {
				size += d.mu.mem.queue[i].totalBytes()
			}
			// 总大小超过阈值,需要阻塞写,等待 compact 完成
			if size >= uint64(d.opts.MemTableStopWritesThreshold)*uint64(d.opts.MemTableSize) {
				if !stalled {
					stalled = true
					d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
						Reason: "memtable count limit reached",
					})
				}
				d.mu.compact.cond.Wait()
				continue
			}
		}
		l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
		// l0 读放大超过阈值需要阻塞等待
		if l0ReadAmp >= d.opts.L0StopWritesThreshold {
			// There are too many level-0 files, so we wait.
			if !stalled {
				stalled = true
				d.opts.EventListener.WriteStallBegin(WriteStallBeginInfo{
					Reason: "L0 file count limit exceeded",
				})
			}
			d.mu.compact.cond.Wait()
			continue
		}

		if b != nil && b.flushable != nil {
			entry := d.newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
			entry.releaseMemAccounting = d.opts.Cache.Reserve(int(b.flushable.totalBytes()))
			d.mu.mem.queue = append(d.mu.mem.queue, entry)
			imm.logNum = 0
		}

		var logSeqNum uint64
		if b != nil {
			logSeqNum = b.SeqNum()
			if b.flushable != nil {
				logSeqNum += uint64(b.Count())
			}
		} else {
			logSeqNum = atomic.LoadUint64(&d.mu.versions.atomic.logSeqNum)
		}

		var entry *flushableEntry
		d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum)
		d.mu.mem.queue = append(d.mu.mem.queue, entry)
		d.updateReadStateLocked(nil)
		if immMem.writerUnref() {
			d.maybeScheduleFlush()
		}
		force = false
	}
}

可以看到,这里有两种情况会强制切换 memtable,一种是 batch 为空,代表手动 flush,另一种是 batch 为 large batch。下面循环中,首先检查当前 memtable 是否正在切换中,如果是则等待当前 memtable 切换完毕;

随后判断当前 batch 是否为常规 batch,如果为常规 batch ,则调用 memtable 的 prepare 函数判断当前 batch 空间是否足够,如果足够则直接返回,否则返回 ErrArenaFull,代表空间已满;

下面进入切换 memtable 的逻辑,到这里可以看出,有三种情况会切换 memtable:1. 手动 flush,2. Large batch 3. 当前 memtable 已满;接下来会计算当前内存所有 memtable 的空间大小,如果总大小超过停写阈值,则会阻塞写,等待 compact 完成;再接下来会判断 L0 的读放大是否超过阈值,如果超过则阻塞写,等待 compact 完成;

如果是 large batch,则会生成 flushableEntry,然后添加到 immutable queue 中;最后会将当前 memtable 切换为 immutable 并加入到 queue 中。最后将 immutable 解引用,并调用 maybeScheduleFlush 触发写入操作。

思考

这里思考几个问题:

  1. 为什么需要设计 large batch
  2. large batch 的日志写入为什么在 makeRoomForWrite 之前(和普通 batch 对比)
  3. large batch 为什么会触发 memtable 切换

这几个问题的解答可以参考笔者向官方提的 issue 以及 官方文档 对 large batch 来源说明。这里笔者参考官方的回答及自己的理解,对上述问题进行解答下。

第一个问题,根据官方描述,pebble 的 memtable 实际上是一个预分配的固定内存大小的 skiplist,因此当 batch 超过 memtable 内存大小时,无法扩容,只能将 large batch 转变为 flushable 来处理,这样间接解决 memtable 内存无法容纳 large batch 的问题。再一个原因是,如果当前 large batch 过大即使未超过 memtable 内存,将 large batch 写入 memtable,那么很快就会导致 memtable full,触发 flush,既然 large batch 很快便会触发 flush,那么不如直接将其转变为 flushable,这样也避免了 large batch 到 memtable 的拷贝开销。

第二个问题,large batch 转变为 flushable 后,在 makeRoomForWrite 中会将其当做 immutable 加入到 immutable queue 中,相当于数据写入内存中,同时可能触发 flush,因此,在数据写入内存前应该先写 log,也就是日志的写入需要在 makeRoomForWrite 前。这个逻辑和普通的 batch 处理不同,可以参考上一章。

第三个问题,既然 large batch 并未写入到当前的 memtable 中,为什么也会将当前的 memtable 切换为 immutable?这个问题要和第二个问题结合起来看,从第二个问题可以知道,large batch 和当前 memtable 其实是共用的同一个 wal,在 makeRoomForWrite 中可能会触发 large batch 的 flushable 刷盘,刷盘后其对应的 wal 理应被删除以避免日志 replay 的开销,因此 memtable 也会被牵连一起刷盘。

总结

本章主要讲解了写入流程中 pebble 对 large batch 的特殊处理方式,同时分析了 makeRoomForWrite 的处理逻辑。最后分析了为什么会产生 large batch 及针对 large batch 特殊处理的原因。