leveldb 的 Compaction
概述
本文讲述 leveldb 的 compaction,包括为什么要 Compation,策略(size 模式和 seek 次数模式)及原因,怎么 Compaction。 Compaction 意即“压”,在 leveldb 中表示将内存中的 table 导入到磁盘和将低层次 sst 压入到高层次。
为何需要 Compaction
- 1.我们为了查询效率,需要将每一层所有的 sst 文件保持有序性,再通过记录这些 sst 文件的键值范围(即 sst 索引)。当查询键对应的值时,就可以在索引中使用二分查找,迅速定位到目标键所在的 sst 文件, 然后再展开查找。
- 2.正是因为这种有序性,在接收并插入一个数据时,需要将它插入到合适的 sst 文件中,当目标 sst 文件已经足够 大时,需要重新整理 sst 文件:拆分,重新组合这些 sst 文件。
- 3.更进一步地考虑,有一些“冷数据”,它们写入后很少被访问,它们不应该放在“热查找”位置,显然这些“热查找”位置应该是数据量少, 查找速度快的位置。这也是为什么 leveldb 规定,level+1 层的文件数量是 level 层的 10 倍(至于为什么是 10 倍的解释参见 sst 结构),即越低的层次越是“热查找”位置。
- 4.基于程序的局部性思考,我们有理由认为新插入的数据有可能会被立即访问,因此,这些新数据应该被放在“热查找”位置-低层次的 sst 文件上。
- 5.低层次 sst 容量较小,我们需要将一些数据换出到较高层次中。这即是 compaction 的动机。
Compaction 策略
leveldb 中 compaction 有两种策略,下面是较直观的解释:
size_compaction: 基于文件大小触发的 compaction. 这样做的动机是 leveldb 的层次控制: 高层的文件数量是低层的 10 倍
seek_compaction: 基于文件“被经过但不命中”的次数达到阈值触发的 compaction. 这样做的动机是消灭稀疏的文件, 将它放入更高层的文件中, 提高文件的稠密度提高查询效率. 文件太稀疏了: 文件的区间(smallest--largest)太大了, 含有的 kv 对只有那么多, 很多查找的键落在此范围但不被命中 这会形成查找的阻碍,将此文件填充到高一层文件中, 提高查询效率
优先执行的是 size_compaction。
Version 管理详细见相关章节。
Compaction 的输入文件
基本输入文件
根据 Compaction 策略,我们先初步求出需要对哪些文件进行 Compaction,它们是 Compaction 的输入文件。
1.size_compaction 策略 在 size_compaction 策略中,通过 compaction_level_ 指出应该在哪一层上进行 compact,而 compact_pointer_ 指出了在这层的哪个文件开始执行 compact:compact_pointer_ 指出了各层上一次 compact 的位置。此次 compact 的对象是这些位置后面的文件。若 compact_pointer_ 为空则从第0个文 件开始。 compaction_level_ 是在每次版本发生变化时(VersionEdit 被 Apply),动态计算出来的,依据是: a).第 0 层文件总大小达到阈值 b).第 n 层文件总大小达到阈值某个倍数(即 score,当前 leveldb 设定为 1) compact_pointer_ 正是在每次确认 compaction 输入文件后被更新。
2.seek_compaction 策略 这是一个更为智能但更复杂的策略。 在数据库 seek 过程中, 对于第一个被遍历到的文件,若只是经过它, 并没有在此文件中命中, 还要继续在其它文件中查找, 则此文件需要被记录下来, 用于后续的 compact。 这样做的依据是: 在查找元素时,此文件以“被经过但不命中”的方式多次访问, 次数多达 allowed_seeks 次, 那么说明此块对于全局的查找构成阻碍已经 到达了一个程度, 此文件需要被"消灭" -- compaction 到更高层,背后更深层次的原因是, 此文件太稀疏了: 文件的区间(smallest--largest)太大了, 很多查找的键落在此范围但不被命中,这会造成查找的效率低下,那么此文件应该被整理,直接填充到上一 level 文件, 使上一 level 文件更稠密. 此策略中,file_to_compact_level_, file_to_compact 指示了哪一层和哪一个文件需要被 compact。这两个值会在 DBImpl::Get 中被修改。
最后,输入文件分为 c[0] 和 c[1],表示第 level 层和 level+1 层的输入文件。
扩展输入文件
由上一点中的输入文件,扩大范围,求得最终的输入文件 1).若当前 Compaction 的 level 为0,则将与 c[0] 键值相交的第0层文件,加入到 c[0]。 2).否则:
- 2.1)由 c[0] 与 level+1 层相交的文件得到 c[1], c[0],c[1] 键值范围是 (all\_start, all\_limit)
- 2.2)若 c[1] 不为空则继续扩展: level 层与 (c[0] 并 c[1]) 相交的文件集合得到 expanded0, 转到 2.3);否则转到 2.5)
- 2.3)若 expanded0 的大小大于 c[0] 则继续扩展: level+1 层与 expanded0 相交的文件集合得到的 expanded1;否则转到 2.5)
- 2.4)若 expanded1 的大小大于 c[1] 则扩展至此, c[0]=expanded0, c[1]=expanded1, c[0] 范围是 (smallest, largest), c[0]并c[1]
范围是 (all\_start, all\_limit);否则转到 2.5)
- 2.5)计算 c[0]并c[1] 与 level+2 相交文件集合作为 c->grandparents\_,将 level 层下一次 compact 的起始位置设置为本次 level 层上
compact 的上界 c[0].largest
第2点如行云流水般的扩展输入文件的做法,本质上是:
- a).c[0]与 level+1 层的文件求相交得到 c[1]:是为了防止 level 层的文件 c[0] compact 到 level+1 中造成重叠,这会
导致逻辑错误,违背“除了第0层外所有层上的文件各不相交”,所以要将 c[1] 这些文件单独拿出来,与 c[0] 一起,compact 到 level+1 层
- b).因 a) 中已将 c[1] 单独拿出,所以 level 中与 c[1] 相交的文件可以 compact 到 level+1 层,这一部分 compact 到 level+1
不会与没有 compact 的部分发生重叠,这一部分加上c[0],记为 expanded0
- c).因 expanded0 将会 compact 到 level+1 中,所以需要找到它与 level+1 层的相交,同样它需要从 level+1 拿出,再 compact 回去,
这一部分加上c[1],记为 expanded1,
- d).若 expanded1 和 expanded0 较原来的 c[1], c[0] 都要大,则说明扩展是有效的,接下来便用 expanded0 和 expanded1 执行 compact。
Snapshot
在 compaction 这一章顺带讲 snapshot 是合适的,因为它与 compaction 关联实在太紧密了,而它单独成一章又无从讲起了。 snapshot,即快照,赋予了使用者查找在过去某一个瞬间数据库状态的能力:在那一时刻,整个 leveldb 有哪些数据,什么版本。 leveldb 会冗余地存储很多旧版本的数据。我们有可能误删了数据, 或者想追踪数据的变化历史,这时快照功能就十分有用了。 实现快照功能有以下两个主要问题:
- 1.前面说过,应该冗余存储旧版本数据。那么应该保留多老的数据呢? 联想到 leveldb 中任何一个数据(键值对)都有一个整数版本号 sequence number,我们也可以用一个整数来告诉 leveldb 对每个 user_key 都应该至少保留一个小于这个整数的版本(如果这个 user_key 有小于此整数的版本的话)。这个整数即是 snapshot。 只要 user_key 在那个时刻(产生 snapshot 这个版本的时刻)有版本(即 user_key 存在小于等于 snapshot 的版本), 则我们应该返回这个时刻当 时最新的版本,即 user_key 的所有版本中小于等于 snapshot 的最大版本。 snapshot 的值必然比当前 leveldb 最新版本号小:最新版本号时时刻刻都是整个 leveldb 中最大的版本号。
- 2.结合1,我们应该如何使用 snapshot 以对每个 user_key 保存一个小于等于它的版本呢?我们在每次新写入时不可能做到这点,最新版本号作为 snapshot 没有意义:违背了快照的初衷,快照是历史的某时刻状态,并非现在。除了新写入数据,后面唯一一个可以动数据的地方就是 compaction 了。 leveldb 通过在 compaction 时给出一个 snapshot,在执行 compaction 时,会为每个 user_key 保留一个小于等于 snapshot 的版本(如果有的话),具体 做法详见 怎样 Compact。
- 3.(犹豫良久还是决定加上这一点)实际上 compaction 时被指定的并非是单个的 snapshot,而是一个 snapshot 集合(链表),这是由于 snapshot 可以动态调整。当指定了 snapshot 集合时,因为保留的逻辑是:“为每个 user_key 保留一个小于等于 snapshot 的版本”,所以只需要取出集合中最小的 snapshot,执行此逻辑保留下来的各个 user_key 的版本范围是最广的,可以为所有的 snapshot 实现快照。
那么如何使用快照呢? -- 查询数据时,通过option传入snapshot参数,查找时会跳过版本号比 snapshot 大的键值对,定位到小于等于 snapshot 的最新版本, 从而读取那个时刻的历史数据。
怎样 Compact
依据上面两种 compaction 策略,选择出待 compaction 的文件(其实还有一些优化,详见 PickCompaction):level 层和 level+1 层,将这两个层次的 sst 文件包装为一个 MergingIterator,按序遍历键值对,一个一个判断,是 drop 掉,还是加入到新的 sst 文件中。
drop
- 1.对于当前遍历到的 user_key,已经为它保留了一个小于等于指定 snapshot 的版本,当前遍历的版本可以 drop。
- 2.首次遍历到的 user_key,不论版本号与 snapshot 关系如何,都应该保留下来。注意,即使当前版本是一个 deletion,也应保留。理由如下: a).这个 user_key 最新版本的值类型是 deleteType:则这个 user_key 在之后的遍历中都不应该再被查找到。但是如果删除了这个 deletion 版本,此 user_key 的旧版本会被遍历,而一旦这个旧版本是一个正常的 valueType,则它会被查找到返回给用户,这是逻辑错误。因此如果要删除这个 deletion 版本,则需要删除此 user_key 的所有版本:让这个 user_key 不可能被查到。基于这点,有以下两个分支判断: a-1).这个 deletion 版本的版本号小于等于 snapshot :删除所有的历史版本,相当于人为地扼杀了 user_key 本应该具有的快照功能。 a-2).这个 deletion 版本的版本号大于 snapshot,有以下两个分支判断: b-1)若后续的遍历中 user_key 存在小于等于 snapshot 的版本,则与上面 a-1) 一样,扼杀了快照功能。 b-2)若后续的遍历中 user_key 不存在小于等于 snapshot 的版本,并没有扼杀对于 snapshot 的快照功能。弊端不很明显,在于,compaction 指定的 是一个版本号数组,虽然没有扼杀最小的 snapshot 的快照功能,但是有可能扼杀较大的 snapshot 的快照功能。不扼杀的情况是:此 user_key 的最老 的版本号大于最大的 snapshot。但是对于其它的 user_key,可能仍然会扼杀快照功能。完全搞清楚:是否对于所有的 user_key 没有扼杀任何一个 snapshot 的快照功能,是一件非常繁杂的事情。因此简单而且稳妥起见,不要 drop 这个 deletion 版本即可。
综上,结论是:首次遍历到的 user_key 不可以 drop。
- 3.非首次遍历到的 user_key 是一个 deletion 版本,则 drop 依赖于以下两个条件满足: 1).当前 user_key 只出现在执行 compaction 的层次(设为 level) 和 level+1 层中 2).当前遍历的版本号小于等于 snapshot
原因如下: 1.当前是要将 level_ 层和 level_+1 层 compact,虽然在这两层将 user_key 的这个 deletion 版本删除了,但如果当前 user_key 还出现在比 level+1 层更高的层中,这些稍老的版本(层次越高数据越老越“冷”)会响应对 user_key 的查找,这将是逻辑错误。 2.若当前这个 deletion 版本号大于 snapshot 仍然将它 drop 掉,为了保持逻辑的正确性,还需要将此 user_key 后续版本全部 drop(第2点中讲过)。但 这样也有问题:会扼杀快照功能(上面详细阐述了这点)
将不可 drop 的键值对加入到新 sst 文件并写入到磁盘,再将它们纳入到 Version 管理,删除一些不会再引用到的文件,即完成了 compaction。 Version 管理详细见相关章节。其它细节见后面代码注释
加入新的 sst 文件
若将当前键值对加入到当前构建中的 sst 文件,导致它与 level+2 层文件有较多重叠,则应该放弃加入: 如果这个 sst 文件与 level+2 层有重叠程度较高(按 leveldb 预定义,和 level+2 达到多于 10 个文件重叠),则不能将这个文件直接放入 level+1 层中, 因为如果后续这个 level+1 层文件发生 compact,则我们需要对 level+2 层多于 10 个文件进行 compact,这是很高的耗费。 因此,我们要停止这个 sst 构建并导出到磁盘,开始新一个 sst 的构建。以上即是 ShouldStopBefore。 实际上,ShouldStopBefore 有两个优化:
- 1.计算 compaction 输入文件时,也一并计算出输入文件与 level+2 层的哪些文件相交:grandparents_,在判断 sst 与 level+2 重叠程度时,用这个更 小的集合去判断可以提高效率
- 2.判断 sst 与 grandparents_ 重叠程度时,并非一起判断,而是对 compaction 流程中依序处理到的键值对,逐个判断的:利用有序性,使得判断的复杂度 是 O(n),详见 ShouldStopBefore 的代码注释
Compact 优化
compaction 中有一种场景可以优化,使 compact 的速度大大提高:当输入文件 c[0] 只有一个文件,c[1] 为空时。也即整个 compaction 只需要处理 level 层的一个文件。当这个文件与 level+2 层重叠的程度较小时,我们直接将这个文件 compact 到 level+1 层即可。这个优化称为“IsTrivialMove”。
因此,正确的做法是,将这个 level 层的文件在 compact 时,拆分为多个文件放入 level+1 中,使得它们每一个与 level+2 层键重合的文件数量保持 最多10个。
DoCompactionWork
/************************************************************************/
/*
lzh: 压缩挑选出来的 level 层和 level+1 层文件集合: c[0], c[1]
leveldb 中的 snapshot 的功能是,
Get 时可以通过option传入snapshot参数,那么查找时会跳过SequenceNumber比snapshot大的键值对,定位到小于等于 snapshot 的最新版本,从而完成快照功能:读取历史数据。
对应地,compaction 应该有如下两个逻辑:
1.为每个 user_key 保留一个小于等于 snapshot 的最大版本号(如果有的话)。
2.最基本的,各个 user_key 要保留其最新的版本,不论这个版本是不是小于 snapshot。
由1,2两点,当 snapshot 没有被设置时,snapshot 应该被默认为整个 leveldb 中当前最大版本号。此时第1点等于无(此时它即是第2点)。
另外,一个解析出错的键值对后面紧挨着的键值对不会被 drop。
*/
/************************************************************************/
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == NULL);
assert(compact->outfile == NULL);
//lzh: 函数注释已经阐明 snapshot 没有被设置时,应该被设置为 leveldb 中当前最大版本号。
//lzh: 若有多个 snapshot 应该使用最小的那个,以保证更古老的版本数据能够被保存下来
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->number_;
}
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
//lzh: 得到遍历所有数据的 Iterator. 它将正序遍历所有的 Internalkey,相同的 user_key 越新的版本越前面被遍历到
Iterator* input = versions_->MakeInputIterator(compact->compaction);
//input = getTestIterator(compact->smallest_snapshot);
input->SeekToFirst();
Status status;
ParsedInternalKey ikey; //当前正遍历到的 internal key
std::string current_user_key; //当前遍历到的 user_key。
//lzh: current_user_key 是否有效。还没有开始遍历,或者解析失败时此值为 false
bool has_current_user_key = false;
//lzh: 上一次遍历到的当前 user_key 的 sequence number。
//lzh: 还没有开始遍历,或者解析失败时,或者当前的 user_key 是首次遍历到,则此值为 kMaxSequenceNumber。保证每第一次遍历到的 user_key 不被 drop
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
//lzh: 依次遍历, 若不 drop, 则 compact
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != NULL) {
CompactMemTable(); //lzh: 将内存中的 mem 导出到磁盘上的 sst 文件(作为第 0 层)
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key = input->key();
//lzh: 是否应该停止往当前构建的 sst 中增加 key. 若返回 true 则需要将当前的构建写入到磁盘然后再重新开始一个 sst 的构建
//lzh: 首次调用 ShouldStopBefore 必然返回 false.
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else
{
// lzh: 首次遇到 ikey.user_key 这个 user_key
if (!has_current_user_key || //lzh: !has_current_user_key 表示上一个 ikey 解析失败或者当前是第一次解析
user_comparator()->Compare(ikey.user_key,
Slice(current_user_key)) != 0) { //lzh: 表示当前的 ikey.user_key 与 current_user_key 不相等
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber; //lzh: 当前 ikey.user_key 是首次遇到,设置 last_sequence_for_key=kMaxSequenceNumber 保证当前 ikey 不被 drop
}
//lzh: 当前 user_key 的上一个版本 <= snapshot,我们把上一个版本保留下来,以满足 snapshot 的功能:为每个 user_key 保留一个小于等于 snapshot 的最大版本号。
//lzh: 从当前版本开始(包括在内),后续所有当前 user_key 的更旧版本可以 drop 了。
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key))
{
//lzh: 若当前 user_key 上一个版本 > snapshot,但当前 user_key 的 type 是 deletion,那么什么情况下可以 drop 掉这个版本呢?
//lzh: 1.当前是要将 level_ 层和 level_+1 层 compact,如果当前 user_key 出现在比level_+1 层更高的层次,那么这个 kv 必然不能被 drop 掉,
//lzh: 否则,其它层可能存在的更旧的版本就会被使用,这将导致逻辑错误。
//lzh: 2.若当前版本大于数据库设置的最小快照号 smallest_snapshot 仍然将它 drop 掉
//lzh: 注意当前的 user_key 是 deletion 版本,为了保持逻辑的正确性:相同 user_key 的 deletion 版本后面的版本是无效的,还需要将此 user_key 后续版本全部 drop
//lzh: 但是这样一来快照功能的逻辑就错了:我们的查找范围是所有小于等于 snapshot 的版本。因为我们已经删除了所有的较旧版本的 user_key,所以使用快照功能也无法访问了,
//lzh: 虽然在一个较旧的时刻,user_key 最新的版本是有效的,在快照下应该是可访问的。
//lzh: 综上,所以判断的条件有以上三个。
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
//lzh: 在赋值之前 last_sequence_for_key 指的是前一次遍历到的当前 user_key 的版本。若当次是首次遍历到当前 user_key 则此值在赋值前为 kMaxSequenceNumber
last_sequence_for_key = ikey.sequence;
}
#if 0
Log(options_.info_log,
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
"%d smallest_snapshot: %d",
ikey.user_key.ToString().c_str(),
(int)ikey.sequence, ikey.type, kTypeValue, drop,
compact->compaction->IsBaseLevelForKey(ikey.user_key),
(int)last_sequence_for_key, (int)compact->smallest_snapshot);
#endif
//lzh: 当前 input->key 被 drop 掉, 不用 compact 到文件中
if (!drop) {
// Open output file if necessary
//lzh: 如果首次进入此分支, compaction 没有初始化(没有生成 file_number, 没有生成文件, 建立 sst 对应的内存 table), 那么需要初始化
if (compact->builder == NULL) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
//lzh: 如果 sst 文件对应的内存 table 中没有数据, 则当前被加入的 key 作为 smallest. (注意 key 是正序遍历的, 所以此逻辑是正确的)
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
//lzh: key 是正序遍历的,所以每次新加入的 key 就是最大值
compact->current_output()->largest.DecodeFrom(key);
//lzh: (key, value) 加入到 sst 内存 table 中
compact->builder->Add(key, input->value());
// Close output file if it is big enough
//lzh: sst 内存 table 满了,需要 dump
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}
input->Next();
}
if (status.ok() && shutting_down_.Acquire_Load()) {
status = Status::IOError("Deleting DB during compaction");
}
//lzh: 最后一轮循环中正在构建的 sst 大小没有达到阈值还没来得及写入磁盘, 此处需要处理
if (status.ok() && compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
delete input;
input = NULL;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
//lzh: 最终将所有生成的 sst 文件应用到内存版本管理, sst 文件层次管理中
if (status.ok()) {
status = InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
/************************************************************************/
/*
lzh:
要点1:
leveldb 提供了两种触发 compaction 的条件/方式:
size_compaction: 基于文件大小触发的 compaction. 这样做的动机是 leveldb 的层次控制: 高层的文件比低层的要大, 约 10 倍
seek_compaction: 基于文件“被经过但不命中”的次数达到阈值触发的 compaction.
这样做的动机是消灭稀疏的文件, 将它放入更高层的文件中, 提高文件的稠密度提高查询效率.
文件太稀疏了: 文件的区间(smallest--largest)太大了, 含有的 kv 对只有那么多, 很多查找的键落在此范围但不被命中
这会形成查找的阻碍,将此文件填充到高一层文件中, 提高查询效率
要点2:
c->inputs_[0] 是 level 层需要 compact 的文件集合
c->inputs_[1] 是 level+1 层需要 compact 的文件集合
*/
/************************************************************************/
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
//lzh; 优先执行 size_compaction
const bool size_compaction = (current_->compaction_score_ >= 1);
//lzh: 在 db->Get 中若对多于一个的 sst 文件/缓存 seek 的次数过多, 则 current_->file_to_compact_ 会被设置
const bool seek_compaction = (current_->file_to_compact_ != NULL);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
// Pick the first file that comes after compact_pointer_[level]
//lzh: 选出首个最大值大于 compact_pointer[level] 的 sst 文件
//lzh: http://catkang.github.io/2017/02/03/leveldb-version.html Version中会记录每层上次Compaction结束后的最大Key值compact_pointer_,
//lzh: 下一次触发自动Compaction会从这个Key开始。容量触发的优先级高于下面将要提到的Seek触发。
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() || //lzh: 本层 compact_pointer_ 以前的文件都已经被 compact 过
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) //lzh: compact_pointer_ 以后的(没 compact 过)文件纳入 compact 范围
{
c->inputs_[0].push_back(f);
break;
}
}
//lzh: c->inputs_[0].empty() 说明 level 层所有的文件之前都已经 compact 过. 但是本函数仍被调用, 进一步说明由于某些情况(如本层 size 仍较大, 或触发了 seek_compaction)
//lzh: 需要再次 compact, 所以从本层第一个文件开始.
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
//lzh: 将 level 层的 current_->file_to_compact_ 文件 compact 到 level+1 层
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL;
}
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
GetOverlappingInputs(0, smallest, largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
//lzh: 找到另外 compact 的输入文件
SetupOtherInputs(c);
return c;
}
/************************************************************************/
/*
lzh: 此函数被两处调用: PickCompaction 和 CompactRange
c->inputs_[0] 简写为 c[0], c->inputs_[1] 简写为 c[1]
1. PickCompaction 中 level 上的一个待 compact 的文件集合是 c[0]. (注意第 0 层的特殊情况. 详见 PickCompaction 最末的处理)
2. 由上面的 c[0] 与 level+1 层相交的文件得到 c[1], c[0],c[1] 范围是 (all_start, all_limit)
3. 若 c[1] 不为空则继续扩展: level 层与 (c[0] 并 c[1]) 相交的文件集合得到 expanded0, 转到 4. 否则转到 6.
4. 若 expanded0 的大小大于 c[0] 则继续扩展: level+1 层与 expanded0 相交的文件集合得到的 expanded1. 否则转到 6.
5. 若 expanded1 的大小大于 c[1] 则扩展至此, c[0]=expanded0, c[1]=expanded1, c[0] 范围是 (smallest, largest), c[0]并c[1] 范围是 (all_start, all_limit). 否则转到 6.
6. 计算 c[0]并c[1] 与 level+2 相交文件集合作为 c->grandparents_. 计算 level 层下一次 compact 的起始位置是 largest, 即本次compact 的上界c[0].largest
*/
/************************************************************************/
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
GetOverlappingInputs(level+1, smallest, largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
GetOverlappingInputs(level, all_start, all_limit, &expanded0);
if (expanded0.size() > c->inputs_[0].size()) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
GetOverlappingInputs(level+1, new_start, new_limit, &expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d to %d+%d\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
int(expanded0.size()),
int(expanded1.size()));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
GetOverlappingInputs(level + 2, all_start, all_limit, &c->grandparents_);
}
if (false) {
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
level,
EscapeString(smallest.Encode()).c_str(),
EscapeString(largest.Encode()).c_str());
}
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}