网易首页 > 网易号 > 正文 申请入驻

TiKV Raft 快照全流程丨TiKV 源码解读(二十二)

0
分享至

导读

TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会的顶级项目。它通过 Raft 协议实现数据的高可用性和强一致性,是 TiDB 分布式数据库系统的重要组成部分。本文作为 TiKV 源码解读系列的增补,详细介绍了 TiKV 8.2.0 版本中 Raft 快照的生成、发送、接收和应用的具体实现。

在 Snapshot 的发送和接收中,我们详细介绍了 Raft 快照的发送和接收机制。本篇文章作为前文的补充,将概述 TiKV 中 Raft 快照的整体流程,并详细介绍涉及的代码路径。本文所讨论的代码基于最近的 8.2.0 版本。

在 TiKV 中,数据空间被切分成各个连续的范围,称为 Region。每个 Region 由一个单独的 Raft 组管理,基于 Raft 协议保证容错性。每个 Raft 组包含多个 Peer,每个 Peer 在不同的 TiKV 节点上运行。

图 1. TiKV 中的 Region 和 Raft 组

在 Raft 协议中,Leader 节点负责将最新的日志条目通过 Append RPC 发送给 Followers,以确保所有节点的数据一致性。为防止磁盘空间的无限制增长,Leader 节点会定期清理过时的本地 Raft 日志。然而,如果某个 Follower 落后太多,以至于其请求的日志条目已经被 Leader 清理,传统的 Append RPC 将无法继续同步数据。在这种情况下,Leader 节点将采取替代措施,向落后的 Follower 发送一个 Raft 快照。这个快照包含了 Region 在某一特定时间点的完整状态快照,不仅包括存储于 RocksDB 中的数据,还有 Raft 协议的状态信息,例如任期号(term)和所对应的日志索引(index)。在 TiKV 的实际应用中,快照机制通常在 Region 初始化、发生分裂或进行扩展等关键操作时被触发,以确保数据的一致性和系统的稳定性。

TiKV 中的 Raft 快照过程大致分为四个阶段:

1. 生成:Raft Leader 生成一份快照,记录下 Raft 和 RocksDB 在当前时间点的状态。

2. 发送:Raft Leader 通过网络把快照发送给 Follower。

3. 接收:Raft Follower 接收快照并暂时存储。

4. 应用:Raft Follower 将快照应用到 Raft 状态机和 RocksDB 数据中。

以下图表更详细地描述了快照过程:

图 2. TiKV 中的 Raft 快照过程

本文将详细介绍 Raft 快照实现的原理和流程,这个部分将介绍核心环节的设计思路,在代码路径详解中我们会逐步介绍细节。

快照元数据和数据的分离

Raft 消息 (protobuf definition:https://github.com/pingcap/kvproto/blob/df42997c2c57536219c67253966ede4d61d25757/include/eraftpb.proto#L77) 有一个 snapshot 字段,可以存储快照数据和元数据。一个简单的做法是将快照数据嵌入到 Raft 消息中,通过 Raft Peer 之间的标准通信通道传输(如图顶部的消息队列所示)。但问题在于 Raft 快照比其它消息大得多,将快照数据放入 Raft 消息中可能会阻塞正常的 Raft 消息处理逻辑。

因此,TiKV 中的一个设计选择是,Raft 快照消息(表示为 MsgSnapshot)仅包含快照的元数据。实际的快照数据作为文件保存在磁盘上。快照文件以及快照消息通过专用的 gRPC 流连接由 Snap Worker 发送(如步骤 7 和 8 所示)。使用 gRPC 流可以将数据分成更小的块以进行高效传输。

从 ApplyFsm 调度RegionTask::Gen

Region Worker 负责生成和应用快照数据。例如,在快照的应用过程中,PeerStorage 调度了一个 RegionTask::Apply 任务给 Region Worker(步骤 11)。但是快照的生成过程略有不同,尽管 PeerStorage 依然是整个过程的发起点,但是 RegionTask::Gen 任务是通过 ApplyFsm 来调度的(步骤 2 和 3)。为什么 PeerStorage 不直接进行 RegionTask::Gen 的调度呢?

这是为了控制快照生成的时间点,让快照尽可能地包含最新的数据。在 Raft 批处理系统中,Raft 信息是分批处理的。同一批信息中可能同时包含快照请求和新的写入请求,而我们希望快照在同批次的写入都完成之后生成。因此,Raft 批处理系统会先把同批次的所有写入任务发给 Apply 批处理系统处理,然后再分派快照任务(ApplyTask::Snapshot)。这样,因为 ApplyFsm 是对任务依次进行处理的,当它处理到快照任务的时候,同批次的写入已经完成。

我们将逐步介绍不同阶段的代码路径,各步骤与上图一一对应。不过这里包含的代码片段很简化,省略了许多细节,仅用于展示大致的流程。代码基于 TiKV 8.2.0 版本。

快照生成

步骤 1: GenSnapTask

TiKV 中实现 Raft 共识协议的是 raft-rs 模块 ( https://github.com/tikv/raft-rs ),快照过程在该模块中发起。在 raft-rs 中,Leader 对每一个 Follower 维护一个 Progress 对象,其中记录了该 Follower 所需要的下一个日志索引(pr.next_idx)。Raft leader 在 maybe_send_append 中处理某个 Follower 的 Append RPC 的发送,如果它无法获取前置日志(pr.next_idx - 1)的任期(用于 Append 过程的匹配校验),则需要发送快照。此时会调用 prepare_send_snapshot 函数,触发快照过程。

raft-rs: src/raft.rs
impl RaftCore {
fn maybe_send_append(..., pr: &mut Progress, ) {
...
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
...
_ => {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
}
}
}
fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to);
// ...
}
}
raft-rs: src/raft_log.rs
impl RaftLog {
pub fn snapshot(&self, request_index: u64, to: u64) -> Result
// ...
self.store.snapshot(request_index, to)
}
}

如上所示,快照过程经过若干调用后来到 Storage trait 的 snapshot 方法(定义:https://github.com/tikv/raft-rs/blob/2aefbf627f243dd261b7585ef1250d32efd9dfe7/src/storage.rs#L159)。在 TiKV 中,Storage trait 的实现是 PeerStorage,其 snapshot 实现如下:

components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
pub fn snapshot(&self, request_index: u64, to: u64) -> raft::Result
// ...
let task = GenSnapTask::new(...);
let mut gen_snap_task = self.gen_snap_task.borrow_mut();
*gen_snap_task = Some(task);
Err(raft::Error::Store(
raft::StorageError::SnapshotTemporarilyUnavailable,
))
}
}

构建了一个 GenSnapTask 并将其设置在 gen_snap_task 字段,然后它会返回一个 SnapshotTemporarilyUnavailable 错误,这个错误意味着快照正在生成过程中。在之后的过程中,函数会在 Raft 协议的每次心跳时被重新调用。如果快照生成未完成,它会继续返回 SnapshotTemporarilyUnavailable。当快照生成完毕后(步骤 5),snapshot() 的调用就会返回 Ok。

PeerStorage::snapshot

snapshot()

步骤 2: ApplyTask::Snapshot

函数检查PeerStorage 的 字段,并将任务发送给 ApplyFsm。如前所述,PeerFsm 会先给 ApplyFsm 发送该批次中所有的写入任务(见 函数),再发送快照任务。

Peer::handle_raft_ready_append

gen_snap_task

handle_raft_committed_entries

components/raftstore/src/store/peer.rs
impl Peer {
pub fn handle_raft_ready_append(...){
// ...
if !ready.committed_entries().is_empty() {
self.handle_raft_committed_entries(ctx, ready.take_committed_entries());
}
if let Some(mut gen_task) = self.mut_store().take_gen_snap_task() {
ctx.apply_router.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
}
}
fn handle_raft_committed_entries
let mut apply = Apply::new(...)
ctx.apply_router.schedule_task(self.region_id, ApplyTask::apply(apply));
}
}

步骤 3: RegionTask::Gen

在 ApplyFsm::handle_snapshot 函数中处理快照任务。它将快照任务转换为 RegionTask::Gen ,并发送给 Region Worker。

ApplyFsm

components/raftstore/src/store/fsm/apply.rs
impl ApplyFsm {
fn handle_tasks() {
loop {
match msg {
Msg::Snapshot(snap_task) => self.handle_snapshot(..., snap_task),
}
}
}
fn handle_snapshot(..., snap_task: GenSnapTask) {
snap_task.generate_and_schedule_snapshot()
}
}
impl GenSnapTask {
pub fn generate_and_schedule_snapshot(){
let snapshot = RegionTask::Gen {...}
region_sched.schedule(snapshot)
}
}

步骤 4 和 5: do_snapshot() and notify

Region Runner 定义了 Region 任务的处理逻辑。一系列函数调用到达 ,由它完成实际的快照生成工作,包括从 RocksDB 扫描 Region 的数据并写入 SST 文件。注意,快照生成工作(ctx.handle_gen)是在一个单独的线程池中,主要的考虑是因为它耗时较长,避免阻塞其它任务。

do_snapshot

components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
fn run(&mut self, task: Task
match task {
Task::Gen {...} => {
let ctx = SnapGenContext {...}
self.pool.spawn(async move {
ctx.handle_gen(...)
}
}
}
}
}
impl SnapGenContext {
fn generate_snap( ... ) {
let snap = box_try!(store::do_snapshot::
notifier.try_send(snap)
}
fn handle_gen( ... ) {
self.generate_snap(...)
}
}

生成完成后,notifier.try_send(snap) 将快照生成结果发送到一个通道,结果将使 PeerStorage::snapshot() 在下一次调用中返回 Ok。

快照发送

步骤 6 和 7: MsgSnapshot and send_snap()

成功后返回一个 Snapshot 结果。不过这个快照只包含元数据,快照的数据依然以 SST 文件的形式存储在磁盘上。

PeerStorage::snapshot()

src/server/raft_client.rs
impl AsyncRaftSender {
fn fill_msg(&mut self, ctx: &Context<'_>) {
// ...
if msg.get_message().has_snapshot() {
self.send_snapshot_sock(msg);
continue;
}
// ...
}
fn send_snapshot_sock(&self, msg: RaftMessage) {
if let Err(e) = self.snap_scheduler.schedule(SnapTask::Send {...}) {
// ...
}
}
}

拦截快照消息并将其转换为快照发送任务。任务被发送到 Snap Scheduler,由 Snap Worker 来处理。

AsyncRaftSender

src/server/snap.rs
impl Runnable for Runner {
fn run(&mut self, task: Task) {
match task {
Task::Send { addr, msg, cb } => {
let send_task = send_snap(...);
let task = async move {
let res = match send_task {
Err(e) => Err(e),
Ok(f) => f.await,
};
// ...
}
self.pool.spawn(task);
}
}
}
}
pub fn send_snap(...) {
// ...
}

Snap Runner 中定义了 Snap Worker 在处理不同任务时的处理逻辑,对于快照发送任务(Task::Send),Snap Runner 生成一个新的异步任务来运行 send_snap 函数。send_snap 通过打开一个新的 gRPC 流连接来传输快照消息及快照数据。

快照接收

步骤 8 和 9: recv_snap() and MsgSnapshot

在接收端,TiKV 实例看到传入的 gRPC 请求,通过调度一个 recv snap 任务来将请求转发给 Snap Worker。

src/server/service/kv.rs
impl Tikv for Service {
fn snapshot(...) {
let task = SnapTask::Recv { stream, sink };
if let Err(e) = self.snap_scheduler.schedule(task) {...}
}
}

Snap Worker 在 recv_snap 函数中接收快照元数据和内容。

src/server/snap.rs
impl Runnable for Runner {
fn run(&mut self, task: Task) {
match task {
Task::Recv { ... } => {
let task = async move {
let result = recv_snap(...).await;
}
self.pool.spawn(task);
}
}
}
}
fn recv_snap(...) {
let mut context = RecvSnapContext::new(head, &snap_mgr)?;
while let Some(item) = stream.next().await {
// ...
}
context.finish(raft_router)
}

快照接收后,context.finish(raft_router) 将快照消息发送到 Raftstore 以触发快照的应用。

快照应用

步骤 10 到 12: apply_snapshot()和 apply_snap()

快照在不同层级和不同地方被应用:

  1. 快照信息被 Raftstore 收到之后,会调用 raft_rs 的 step 函数。经过一系列的调用,到达 Raft::handle_snapshot ,在该函数中恢复 Raft 状态机的日志和配置。

raft-rs: src/raft.rs
impl Raft {
fn handle_snapshot(&mut self, mut m: Message) {
if self.restore(m.take_snapshot()) {...}
}
pub fn restore(&mut self, snap: Snapshot) -> bool {
...
self.raft_log.restore(snap);
}
}

  1. Raftstore 在对 ready 处理时 Peer::handle_raft_ready_append 会调用 函数,该函数会调用PeerStorage::apply_snapshot 来更新 Peer 的状态。在 Snapshot 被持久化之后,PeerStorage::on_persist_snapshot 函数会被调用,它会进一步调用PeerStorage::persist_snapshot 将快照应用任务发送给 Region Worker。
  2. PeerStorage::handle_raft_ready

components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
pub fn apply_snapshot() {...}
pub fn persist_snapshot(&mut self, res: &PersistSnapshotResult) {
self.schedule_applying_snapshot();
}
pub fn schedule_applying_snapshot(&mut self) {
let task = RegionTask::Apply {}
if let Err(e) = self.region_scheduler.schedule(task) {...}
}
}

  1. Region Worker 对快照应用任务进行处理,经过一系列调用会来到 Runner::apply_snap,它会将 Region 中的数据更新为快照中的数据。

components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
fn run(&mut self, task: Task
match task {
Task::Apply { .. } => {
self.pending_applies.push_back(task);
self.handle_pending_applies(false);
}
}
}
fn handle_pending_applies(&mut self, is_timeout: bool) {
while !self.pending_applies.is_empty() {
if let Some(Task::Apply { region_id, .. }) = self.pending_applies.front() {
if let Some(Task::Apply {}) = self.pending_applies.pop_front() {
self.handle_apply(region_id, peer_id, status);
}
}
}
}
fn handle_apply(&mut self, region_id: u64, peer_id: u64, status: Arc
match self.apply_snap(region_id, peer_id, Arc::clone(&status)) {...}
}
fn apply_snap(&mut self, region_id: u64, peer_id: u64, abort: Arc
// ...
}
}

) -> Result<()> {

以上便是 TiKV 中与快照相关的代码路径概览。希望通过本文的介绍,能帮助读者更深入地理解 TiKV 中的 Raft 快照机制及其实现细节,从而更有效地进行源码阅读和学习。

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
从这次俄乌冲突中我看到了一个大写的人!他头发蓬乱但充满正义!

从这次俄乌冲突中我看到了一个大写的人!他头发蓬乱但充满正义!

翻开历史和现实
2025-03-04 16:09:07
三级片女港星嫁山东农村!老公一句话让全网炸锅:这婚结得真值!

三级片女港星嫁山东农村!老公一句话让全网炸锅:这婚结得真值!

美食阿鳕
2025-03-07 08:43:46
中国最北、最南、最东、最西的四大城市,一辈子必须去一次!

中国最北、最南、最东、最西的四大城市,一辈子必须去一次!

布拉旅游说
2025-03-05 12:27:43
两会开幕!养老金调整有新消息吗?今年平均上涨比例或不低于3%

两会开幕!养老金调整有新消息吗?今年平均上涨比例或不低于3%

王五说说看
2025-03-07 06:54:37
十三世达赖为什么出逃印度

十三世达赖为什么出逃印度

归史
2025-03-06 15:16:07
被香港富婆包了一个月,我产生了心理阴影,付出了惨痛的代价

被香港富婆包了一个月,我产生了心理阴影,付出了惨痛的代价

神奇的锤子
2025-02-27 16:56:44
俄乌冲突提醒中国:我们应该在西部沙漠,屯够300万人的武器弹药

俄乌冲突提醒中国:我们应该在西部沙漠,屯够300万人的武器弹药

车马点兵V
2025-03-05 09:59:46
市长书记要求县书记推荐指定的女干部,县书记找她们谈话套出秘密

市长书记要求县书记推荐指定的女干部,县书记找她们谈话套出秘密

实话叔
2023-12-27 15:29:28
吉利李书福谈汽车价格战:打到剩下几家有竞争力的车企为止

吉利李书福谈汽车价格战:打到剩下几家有竞争力的车企为止

IT之家
2025-03-06 11:23:10
特朗普和马斯克“惹祸了”?!2月裁员飙升,贸易逆差扩大至历史新高 数据“实锤”经济衰退将至?

特朗普和马斯克“惹祸了”?!2月裁员飙升,贸易逆差扩大至历史新高 数据“实锤”经济衰退将至?

FX168北美财经圈
2025-03-07 01:23:12
马筱梅真霸气!具俊晔前脚搬进大S豪宅,她后脚和汪小菲接走玥儿

马筱梅真霸气!具俊晔前脚搬进大S豪宅,她后脚和汪小菲接走玥儿

小娱乐悠悠
2025-03-06 15:05:46
独显再无用武之地!曝AMD下代APU游戏性能媲美RTX 5070 Ti

独显再无用武之地!曝AMD下代APU游戏性能媲美RTX 5070 Ti

快科技
2025-03-04 11:24:33
探秘 Sybil A:别样领域的闪耀 “精灵”

探秘 Sybil A:别样领域的闪耀 “精灵”

AI余乐
2025-03-06 15:01:22
她才是最“旺夫”的女人——叶莲娜·泽连斯卡亚

她才是最“旺夫”的女人——叶莲娜·泽连斯卡亚

大风文字
2025-03-05 08:50:07
恭喜!姚明正式上任,官宣新岗位,多年付出获最好回报

恭喜!姚明正式上任,官宣新岗位,多年付出获最好回报

体育有点水
2025-03-01 12:22:45
本菲卡破尴尬纪录:从没有一支球队在人数占优68分钟的情况下输球

本菲卡破尴尬纪录:从没有一支球队在人数占优68分钟的情况下输球

直播吧
2025-03-06 19:22:08
姚安娜新剧饰演急诊医生演技自然,网友感叹之前还是对她太大声了!

姚安娜新剧饰演急诊医生演技自然,网友感叹之前还是对她太大声了!

北青网-北京青年报
2025-03-06 18:16:04
张豆豆大卷发+机车狂飙!扩展个人商业版图,不做孙杨的保姆管家

张豆豆大卷发+机车狂飙!扩展个人商业版图,不做孙杨的保姆管家

杨华评论
2025-03-06 20:08:07
刘素英代表:“无堂食”外卖成监管盲区,应加强平台准入审核

刘素英代表:“无堂食”外卖成监管盲区,应加强平台准入审核

澎湃新闻
2025-03-07 08:42:26
詹姆斯破僵局,恩佐替补建功!切尔西终结4连败,2-1剑指八强!

詹姆斯破僵局,恩佐替补建功!切尔西终结4连败,2-1剑指八强!

钉钉陌上花开
2025-03-07 03:40:30
2025-03-07 09:59:00
PingCAP
PingCAP
分布式数据库TiDB背后团队
542文章数 624关注度
往期回顾 全部

科技要闻

星舰第八飞:又夹住了助推器 但飞船被引爆

头条要闻

老人在养生门店一年多消费47万 家人在其去世后才发现

头条要闻

老人在养生门店一年多消费47万 家人在其去世后才发现

体育要闻

1次射正就绝杀!欧冠诞生史诗级系统局

娱乐要闻

曝具俊晔承担大S房贷,每月还百万

财经要闻

信息量巨大!五部门"掌门人"重磅发

汽车要闻

6.98万元起 埃安UT定价背后的决心与野心

态度原创

教育
家居
健康
艺术
时尚

教育要闻

怎样帮助自己的孩子一步步戒断网瘾?

家居要闻

法式浪漫 时尚轻奢人文

戴耳机超1小时=听力慢性自杀?

艺术要闻

故宫珍藏的墨迹《十七帖》,比拓本更精良,这才是地道的魏晋写法

推广|| 挖到新宝!在时装周的每一天都离不开它

无障碍浏览 进入关怀版