leveldb的Compact分为两类,一类是内存里的MemTable满了,需要将MemTable中的数据写入磁盘,这个过程叫做Minor Compaction,另一类是磁盘上的SSTable文件过多了,需要将它们合并成更大的SSTable文件,这个过程叫做Major Compaction。
Major Compaction
由于leveldb的LSM-Tree结构,数据总是以追加的形式写入磁盘,更新和删除操作并不会直接修改磁盘上的数据,而是通过在MemTable中记录一个新的版本来实现的(leveldb的内部存储键是一个userKey+tag的组合形成的InternalKey,这个tag中包含了seqnum和操作的类型如Put和Delete),每次写(包括删除)操作必然导致数据的增加,一方面会导致磁盘空间的浪费,另一方面也会导致查询效率的降低。因此需要定期进行Major Compaction,将多个SSTable文件合并成一个更大的SSTable文件,在这个过程中会去掉已经被更新或者删除的数据,从而减少磁盘空间的占用,提高查询效率。
Major Compaction的自动Compaction过程是一个后台线程,主要的代码调用流程如下:
MaybeScheduleCompaction():入口,判断是否需要进行compaction,如果需要则将BackgroundCall()投递到后台线程池执行
│
▼
BackgroundCall():后台线程中执行的包装函数,调用BackgroundCompaction()完成一次compaction的实际工作,compaction完成后再次调用MaybeScheduleCompaction()检查是否需要继续进行compaction
│
▼
BackgroundCompaction():compaction的核心调度逻辑,负责选文件、读入、归并、替换版本,如果是trivial move则直接更新版本,否则调用
│
▼
DoCompactionWork():真正干活的函数,执行多路归并、去重、丢弃过期版本、写出新SSTable
核心在BackgroundCompaction与DoCompactionWork两个函数,前者负责选文件和调度,后者负责实际的归并和输出。
整体的compaction流程是从某个level层里选出一个文件,然后将level+1层里所有与这个文件有相交的文件都选出来进行归并形成一个新的文件放到level+1里,再适当删除文件。但还有些细节会导致level层的文件不止一个被选中进行compaction,下面先说下选文件的逻辑,再说下compaction的具体流程。
选文件
首先同一层的文件是有序且不重叠的(level0除外),所以Compaction不发生在同一层文件之间,而是发生在相邻的两层之间,Compaction的输入文件包包括level中被选中的文件和level+1中所有与这些文件有重叠的文件,Compaction的输出文件会被放到level+1层中。
选文件的逻辑在VersionSet::PickCompaction()中实现,由BackgroundCompaction()调用,但在具体看代码前要说下leveldb中关于要压缩的件的两个策略
size_compaction
要压缩的一个原因就是因为文件太多了,leveldb为每层设计了一个固定的最大大小(10M,100M,1G…),某层的文件越接近阈值说明越需要进行compaction,这个计算发生在VersionSet::Finalize()中,leveldb在产生一个新的version时会调用Finalize()函数来计算每层的compaction score,score越大说明该层越需要进行compaction.level0的计算是以文件数量来衡量,因为level0的文件是可能重叠的,每次读都要遍历所有文件,文件数量非常影响读性能。由此,score的计算方式如下:
- level0的score = level0文件数 / kL0_CompactionTrigger
- level1及以上的score = level层的总大小 / MaxBytesForLevel(level)
计算完成后会将score最大的层记录到version当中,当要发生compaction时PickCompaction会直接从version读取到这个信息
seek_compaction
除了大小这个因素外,leveldb还会根据文件的seek次数来判断该文件是否需要进行compaction。简单讲就是一个文件被seek的次数很多但是又没有命中,说明该文件需要进行compaction以提高查询效率。这个seek的次数记录在文件当中,当生成一个sst文件时其filemeta的allowed_seek会进行相应的设置,代码如下:
// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
上面的注释已经解释了seek的计算方式,简单来说就是考虑compaction的成本和seek的成本,当seek该块浪费的时间已经和进行以此compaction一样时就可以触发compaction了。
具体逻辑
了解两个策略后就可以看PickCompaction的代码了,具体代码如下:
Compaction* VersionSet::PickCompaction() {
// Compaction主要就是一个记录要被合并的两层文件的结构体,核心成员是
// std::vector<FileMetaData*> inputs_[2];
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {
// 如果是size_compaction触发,就直接从version中读取要进行compaction的层级,但是Compaction的进行是要选定一个文件开始的,下面的for循环逻辑就是为了从level里选出要compaction的层
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level]
// 选的逻辑是round-robin的方式,compaction_pointer_记录了每一层上次进行compaction时选取的文件的largest key
// 下次该层进行compaction时就直接从这个largest key后的第一个文件开始选取,这样做的目的是防止某个key范围内的文件一直被选中而其它范围内的文件一直没被compaction到
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// 整个for循环走完了没找比上次compaction更大的key,说明走到末尾了,直接从头开始选第一个文件进行compaction
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}
// 处理并行的一个操作,因为compaction可能与用户的读写并行运行,这里leveldb的做法是将当前version的一个快照记录到compaction当中,使得整个compaction过程都是在这个version当中进行
// 因为已经生成的sst文件不会被修改,compaction过程中只要知道当时有哪些文件就行,用户的读写并不会影响这些文件的状态
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
// 如果是level0的文件被选中要进行compaction了,由于level0之间的文件是有重叠的,所以需要将所有与被选中的文件有重叠的文件都选出来进行compaction,level1及以上的文件是没有重叠的,所以只需要选中一个文件就行了
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
// 到了这里只是选出了level层的文件,level+1层的文件由下面的SetupOtherInputs()函数选出
SetupOtherInputs(c);
return c;
}
上面大部分逻辑都是在进行level层的文件选择,可以看到除了level0的文件需要进行重叠检查,其它层的文件都是选一个文件,然后在SetupOtherInputs寻找level+1层的文件,但是这个SetupOtherInputs函数可能会扩展level层的文件集合,具体原因如代码注释:
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
// 这个AddBoundaryInputs的作用是将level层中与被选中的文件有重叠的文件也加入到该层要被compaction的集合中
// 前面说过同层文件是有序且不重叠的,这里的不重叠是指InternalKey的不重叠,同一个userKey+tag形成InteralKey可能会被分到两个sst中
// 考虑这样一种情况,有两个文件b1,b2它们的key范围是[l1,u1],[l2,u2],userKey(u1) == userKey(l1)
// 这时如果选中b1进行compaction了,那b2会留在level,下次查找时会优先找到b2(因为b1被压到下一层去了)而b2相比b1来说是过时的(同一userKey更大的seqnum会放到更前)导致读取到错误数据
// 所以选好初始文件后要调用该函数进行这种扩充,下面的level+1层的文件选好后也会调用该函数扩充
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
// 下面的操作是一个优化,目的是在不增加level+1层的文件数量前提下尽可能增加level层的文件数
// 原因如下图所示:
/*Level 1: [A──C] [E──G] [H──K] ← 只选中了 [A,C]
↑ picked
Level 2: [A─────────────G] [H──K]
└── inputs_[1] ──┘
*/
// 在Compaction时I/O的成本主要是对level+1层的文件进行读取,如果已经要对level+1中的文件进行读取了,尽可能多将level层的文件进行compaction,因为比起读取level+1层的文件,读取level层的文件成本要小,又能尽量减少level层的文件数量,达到更好的压缩效果,上图中可以将[E,G]加入
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
AddBoundaryInputs(icmp_, current_->files_[level], &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
&expanded1);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level, int(c->inputs_[0].size()), int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size), int(expanded0.size()),
int(expanded1.size()), long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
...
}
Compaction具体逻辑
选出要进行compaction的文件后就是开始进行真正的compaction了,compaction的核心逻辑就是一个多路归并的过程,也就是将Compaction的inputs_[0]和inputs_[1]中的文件进行归并,去掉过期版本,丢弃被删除的key,最后将新的文件写入磁盘并更新version。整体的逻辑在BackgroundCompaction()和DoCompactionWork()中实现,前者负责调度,后者负责具体的归并和输出。
TrivialMove
在BackGroundCompaction中调用PickCompaction选出要进行compaction的文件后有一种特殊情况可以直接将level层的文件移动到level+1层,这种情况需要满足以下条件:
- level层只有一个文件被选中
- level+1层没有与这个文件有重叠的文件,也就是level+1层没有文件要compation
- 跟level+2层的重叠文件不超过25M(这个条件是防止该文件直接移动到level+1层后后续与level+2层进行compaction时压力过大)
如果上述条件满足就直接将level层的文件移动到level+1层,更新version后就完成了compaction的过程,这样就避免了进行多路归并的过程,提高了效率。
DoCompactionWork
如果不满足TrivialMove的条件就需要进行正常的compaction了,compaction的核心逻辑在DoCompactionWork()中实现,主要包括以下几个步骤
参考连接
https://izualzhy.cn/archive.html?tag=leveldb
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 2128099421@qq.com