网易首页 > 网易号 > 正文 申请入驻

TiKV Raft 快照全流程丨TiKV 源码解读(二十二)

0
分享至

导读

TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会的顶级项目。它通过 Raft 协议实现数据的高可用性和强一致性,是 TiDB 分布式数据库系统的重要组成部分。本文作为 TiKV 源码解读系列的增补,详细介绍了 TiKV 8.2.0 版本中 Raft 快照的生成、发送、接收和应用的具体实现。

在 Snapshot 的发送和接收中,我们详细介绍了 Raft 快照的发送和接收机制。本篇文章作为前文的补充,将概述 TiKV 中 Raft 快照的整体流程,并详细介绍涉及的代码路径。本文所讨论的代码基于最近的 8.2.0 版本。



在 TiKV 中,数据空间被切分成各个连续的范围,称为 Region。每个 Region 由一个单独的 Raft 组管理,基于 Raft 协议保证容错性。每个 Raft 组包含多个 Peer,每个 Peer 在不同的 TiKV 节点上运行。



图 1. TiKV 中的 Region 和 Raft 组

在 Raft 协议中,Leader 节点负责将最新的日志条目通过 Append RPC 发送给 Followers,以确保所有节点的数据一致性。为防止磁盘空间的无限制增长,Leader 节点会定期清理过时的本地 Raft 日志。然而,如果某个 Follower 落后太多,以至于其请求的日志条目已经被 Leader 清理,传统的 Append RPC 将无法继续同步数据。在这种情况下,Leader 节点将采取替代措施,向落后的 Follower 发送一个 Raft 快照。这个快照包含了 Region 在某一特定时间点的完整状态快照,不仅包括存储于 RocksDB 中的数据,还有 Raft 协议的状态信息,例如任期号(term)和所对应的日志索引(index)。在 TiKV 的实际应用中,快照机制通常在 Region 初始化、发生分裂或进行扩展等关键操作时被触发,以确保数据的一致性和系统的稳定性。



TiKV 中的 Raft 快照过程大致分为四个阶段:

1. 生成:Raft Leader 生成一份快照,记录下 Raft 和 RocksDB 在当前时间点的状态。

2. 发送:Raft Leader 通过网络把快照发送给 Follower。

3. 接收:Raft Follower 接收快照并暂时存储。

4. 应用:Raft Follower 将快照应用到 Raft 状态机和 RocksDB 数据中。

以下图表更详细地描述了快照过程:



图 2. TiKV 中的 Raft 快照过程



本文将详细介绍 Raft 快照实现的原理和流程,这个部分将介绍核心环节的设计思路,在代码路径详解中我们会逐步介绍细节。

快照元数据和数据的分离

Raft 消息 (protobuf definition:https://github.com/pingcap/kvproto/blob/df42997c2c57536219c67253966ede4d61d25757/include/eraftpb.proto#L77) 有一个 snapshot 字段,可以存储快照数据和元数据。一个简单的做法是将快照数据嵌入到 Raft 消息中,通过 Raft Peer 之间的标准通信通道传输(如图顶部的消息队列所示)。但问题在于 Raft 快照比其它消息大得多,将快照数据放入 Raft 消息中可能会阻塞正常的 Raft 消息处理逻辑。

因此,TiKV 中的一个设计选择是,Raft 快照消息(表示为 MsgSnapshot)仅包含快照的元数据。实际的快照数据作为文件保存在磁盘上。快照文件以及快照消息通过专用的 gRPC 流连接由 Snap Worker 发送(如步骤 7 和 8 所示)。使用 gRPC 流可以将数据分成更小的块以进行高效传输。

从 ApplyFsm 调度RegionTask::Gen

Region Worker 负责生成和应用快照数据。例如,在快照的应用过程中,PeerStorage 调度了一个 RegionTask::Apply 任务给 Region Worker(步骤 11)。但是快照的生成过程略有不同,尽管 PeerStorage 依然是整个过程的发起点,但是 RegionTask::Gen 任务是通过 ApplyFsm 来调度的(步骤 2 和 3)。为什么 PeerStorage 不直接进行 RegionTask::Gen 的调度呢?

这是为了控制快照生成的时间点,让快照尽可能地包含最新的数据。在 Raft 批处理系统中,Raft 信息是分批处理的。同一批信息中可能同时包含快照请求和新的写入请求,而我们希望快照在同批次的写入都完成之后生成。因此,Raft 批处理系统会先把同批次的所有写入任务发给 Apply 批处理系统处理,然后再分派快照任务(ApplyTask::Snapshot)。这样,因为 ApplyFsm 是对任务依次进行处理的,当它处理到快照任务的时候,同批次的写入已经完成。



我们将逐步介绍不同阶段的代码路径,各步骤与上图一一对应。不过这里包含的代码片段很简化,省略了许多细节,仅用于展示大致的流程。代码基于 TiKV 8.2.0 版本。

快照生成

步骤 1: GenSnapTask

TiKV 中实现 Raft 共识协议的是 raft-rs 模块 ( https://github.com/tikv/raft-rs ),快照过程在该模块中发起。在 raft-rs 中,Leader 对每一个 Follower 维护一个 Progress 对象,其中记录了该 Follower 所需要的下一个日志索引(pr.next_idx)。Raft leader 在 maybe_send_append 中处理某个 Follower 的 Append RPC 的发送,如果它无法获取前置日志(pr.next_idx - 1)的任期(用于 Append 过程的匹配校验),则需要发送快照。此时会调用 prepare_send_snapshot 函数,触发快照过程。

raft-rs: src/raft.rs
impl RaftCore {
fn maybe_send_append(..., pr: &mut Progress, ) {
...
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
...
_ => {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
}
}
}
fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to);
// ...
}
}
raft-rs: src/raft_log.rs
impl RaftLog {
pub fn snapshot(&self, request_index: u64, to: u64) -> Result
// ...
self.store.snapshot(request_index, to)
}
}

如上所示,快照过程经过若干调用后来到 Storage trait 的 snapshot 方法(定义:https://github.com/tikv/raft-rs/blob/2aefbf627f243dd261b7585ef1250d32efd9dfe7/src/storage.rs#L159)。在 TiKV 中,Storage trait 的实现是 PeerStorage,其 snapshot 实现如下:

components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
pub fn snapshot(&self, request_index: u64, to: u64) -> raft::Result
// ...
let task = GenSnapTask::new(...);
let mut gen_snap_task = self.gen_snap_task.borrow_mut();
*gen_snap_task = Some(task);
Err(raft::Error::Store(
raft::StorageError::SnapshotTemporarilyUnavailable,
))
}
}

构建了一个 GenSnapTask 并将其设置在 gen_snap_task 字段,然后它会返回一个 SnapshotTemporarilyUnavailable 错误,这个错误意味着快照正在生成过程中。在之后的过程中,函数会在 Raft 协议的每次心跳时被重新调用。如果快照生成未完成,它会继续返回 SnapshotTemporarilyUnavailable。当快照生成完毕后(步骤 5),snapshot() 的调用就会返回 Ok。

PeerStorage::snapshot

snapshot()

步骤 2: ApplyTask::Snapshot

函数检查PeerStorage 的 字段,并将任务发送给 ApplyFsm。如前所述,PeerFsm 会先给 ApplyFsm 发送该批次中所有的写入任务(见 函数),再发送快照任务。

Peer::handle_raft_ready_append

gen_snap_task

handle_raft_committed_entries

components/raftstore/src/store/peer.rs
impl Peer {
pub fn handle_raft_ready_append(...){
// ...
if !ready.committed_entries().is_empty() {
self.handle_raft_committed_entries(ctx, ready.take_committed_entries());
}
if let Some(mut gen_task) = self.mut_store().take_gen_snap_task() {
ctx.apply_router.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
}
}
fn handle_raft_committed_entries
let mut apply = Apply::new(...)
ctx.apply_router.schedule_task(self.region_id, ApplyTask::apply(apply));
}
}

步骤 3: RegionTask::Gen

在 ApplyFsm::handle_snapshot 函数中处理快照任务。它将快照任务转换为 RegionTask::Gen ,并发送给 Region Worker。

ApplyFsm

components/raftstore/src/store/fsm/apply.rs
impl ApplyFsm {
fn handle_tasks() {
loop {
match msg {
Msg::Snapshot(snap_task) => self.handle_snapshot(..., snap_task),
}
}
}
fn handle_snapshot(..., snap_task: GenSnapTask) {
snap_task.generate_and_schedule_snapshot()
}
}
impl GenSnapTask {
pub fn generate_and_schedule_snapshot(){
let snapshot = RegionTask::Gen {...}
region_sched.schedule(snapshot)
}
}

步骤 4 和 5: do_snapshot() and notify

Region Runner 定义了 Region 任务的处理逻辑。一系列函数调用到达 ,由它完成实际的快照生成工作,包括从 RocksDB 扫描 Region 的数据并写入 SST 文件。注意,快照生成工作(ctx.handle_gen)是在一个单独的线程池中,主要的考虑是因为它耗时较长,避免阻塞其它任务。

do_snapshot

components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
fn run(&mut self, task: Task
match task {
Task::Gen {...} => {
let ctx = SnapGenContext {...}
self.pool.spawn(async move {
ctx.handle_gen(...)
}
}
}
}
}
impl SnapGenContext {
fn generate_snap( ... ) {
let snap = box_try!(store::do_snapshot::
notifier.try_send(snap)
}
fn handle_gen( ... ) {
self.generate_snap(...)
}
}

生成完成后,notifier.try_send(snap) 将快照生成结果发送到一个通道,结果将使 PeerStorage::snapshot() 在下一次调用中返回 Ok。

快照发送

步骤 6 和 7: MsgSnapshot and send_snap()

成功后返回一个 Snapshot 结果。不过这个快照只包含元数据,快照的数据依然以 SST 文件的形式存储在磁盘上。

PeerStorage::snapshot()

src/server/raft_client.rs
impl AsyncRaftSender {
fn fill_msg(&mut self, ctx: &Context<'_>) {
// ...
if msg.get_message().has_snapshot() {
self.send_snapshot_sock(msg);
continue;
}
// ...
}
fn send_snapshot_sock(&self, msg: RaftMessage) {
if let Err(e) = self.snap_scheduler.schedule(SnapTask::Send {...}) {
// ...
}
}
}

拦截快照消息并将其转换为快照发送任务。任务被发送到 Snap Scheduler,由 Snap Worker 来处理。

AsyncRaftSender

src/server/snap.rs
impl Runnable for Runner {
fn run(&mut self, task: Task) {
match task {
Task::Send { addr, msg, cb } => {
let send_task = send_snap(...);
let task = async move {
let res = match send_task {
Err(e) => Err(e),
Ok(f) => f.await,
};
// ...
}
self.pool.spawn(task);
}
}
}
}
pub fn send_snap(...) {
// ...
}

Snap Runner 中定义了 Snap Worker 在处理不同任务时的处理逻辑,对于快照发送任务(Task::Send),Snap Runner 生成一个新的异步任务来运行 send_snap 函数。send_snap 通过打开一个新的 gRPC 流连接来传输快照消息及快照数据。

快照接收

步骤 8 和 9: recv_snap() and MsgSnapshot

在接收端,TiKV 实例看到传入的 gRPC 请求,通过调度一个 recv snap 任务来将请求转发给 Snap Worker。

src/server/service/kv.rs
impl Tikv for Service {
fn snapshot(...) {
let task = SnapTask::Recv { stream, sink };
if let Err(e) = self.snap_scheduler.schedule(task) {...}
}
}

Snap Worker 在 recv_snap 函数中接收快照元数据和内容。

src/server/snap.rs
impl Runnable for Runner {
fn run(&mut self, task: Task) {
match task {
Task::Recv { ... } => {
let task = async move {
let result = recv_snap(...).await;
}
self.pool.spawn(task);
}
}
}
}
fn recv_snap(...) {
let mut context = RecvSnapContext::new(head, &snap_mgr)?;
while let Some(item) = stream.next().await {
// ...
}
context.finish(raft_router)
}

快照接收后,context.finish(raft_router) 将快照消息发送到 Raftstore 以触发快照的应用。

快照应用

步骤 10 到 12: apply_snapshot()和 apply_snap()

快照在不同层级和不同地方被应用:

  1. 快照信息被 Raftstore 收到之后,会调用 raft_rs 的 step 函数。经过一系列的调用,到达 Raft::handle_snapshot ,在该函数中恢复 Raft 状态机的日志和配置。

raft-rs: src/raft.rs
impl Raft {
fn handle_snapshot(&mut self, mut m: Message) {
if self.restore(m.take_snapshot()) {...}
}
pub fn restore(&mut self, snap: Snapshot) -> bool {
...
self.raft_log.restore(snap);
}
}

  1. Raftstore 在对 ready 处理时 Peer::handle_raft_ready_append 会调用 函数,该函数会调用PeerStorage::apply_snapshot 来更新 Peer 的状态。在 Snapshot 被持久化之后,PeerStorage::on_persist_snapshot 函数会被调用,它会进一步调用PeerStorage::persist_snapshot 将快照应用任务发送给 Region Worker。
  2. PeerStorage::handle_raft_ready

components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
pub fn apply_snapshot() {...}
pub fn persist_snapshot(&mut self, res: &PersistSnapshotResult) {
self.schedule_applying_snapshot();
}
pub fn schedule_applying_snapshot(&mut self) {
let task = RegionTask::Apply {}
if let Err(e) = self.region_scheduler.schedule(task) {...}
}
}

  1. Region Worker 对快照应用任务进行处理,经过一系列调用会来到 Runner::apply_snap,它会将 Region 中的数据更新为快照中的数据。

components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
fn run(&mut self, task: Task
match task {
Task::Apply { .. } => {
self.pending_applies.push_back(task);
self.handle_pending_applies(false);
}
}
}
fn handle_pending_applies(&mut self, is_timeout: bool) {
while !self.pending_applies.is_empty() {
if let Some(Task::Apply { region_id, .. }) = self.pending_applies.front() {
if let Some(Task::Apply {}) = self.pending_applies.pop_front() {
self.handle_apply(region_id, peer_id, status);
}
}
}
}
fn handle_apply(&mut self, region_id: u64, peer_id: u64, status: Arc
match self.apply_snap(region_id, peer_id, Arc::clone(&status)) {...}
}
fn apply_snap(&mut self, region_id: u64, peer_id: u64, abort: Arc
// ...
}
}

) -> Result<()> {



以上便是 TiKV 中与快照相关的代码路径概览。希望通过本文的介绍,能帮助读者更深入地理解 TiKV 中的 Raft 快照机制及其实现细节,从而更有效地进行源码阅读和学习。

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
张劲松、王毅,被开除党籍

张劲松、王毅,被开除党籍

鲁中晨报
2024-11-22 23:24:19
山姆报警后网友发现打包哥又去了,这次还带上了一家三口

山姆报警后网友发现打包哥又去了,这次还带上了一家三口

映射生活的身影
2024-11-23 18:21:40
加沙面饼店缺粮闭店5天 开门后排队人山人海

加沙面饼店缺粮闭店5天 开门后排队人山人海

看看新闻Knews
2024-11-23 20:36:09
英媒:它20分钟内可打击欧洲任一城市

英媒:它20分钟内可打击欧洲任一城市

鲁中晨报
2024-11-24 07:28:04
最能打的医生!石铭在UFC澳门站暴力KO对手,主业竟是中医针灸师

最能打的医生!石铭在UFC澳门站暴力KO对手,主业竟是中医针灸师

直播吧
2024-11-24 00:50:25
输给利物浦就退出争冠?瓜迪奥拉:是的,如果利物浦赢了就会这样

输给利物浦就退出争冠?瓜迪奥拉:是的,如果利物浦赢了就会这样

直播吧
2024-11-24 07:25:28
李嘉诚:一个月挣三千块钱,你再怎么节约也只有三千

李嘉诚:一个月挣三千块钱,你再怎么节约也只有三千

清风拂心
2024-11-23 17:20:03
无视通缉令,清除卡西姆!以军导弹在民众眼前拦腰斩断大厦

无视通缉令,清除卡西姆!以军导弹在民众眼前拦腰斩断大厦

大风文字
2024-11-24 09:50:55
克钦独立军准备掀桌子,彻底跟邻国东方大国顶着来 !

克钦独立军准备掀桌子,彻底跟邻国东方大国顶着来 !

星空上的旧梦
2024-11-23 14:46:15
法国就乌克兰发射洲际弹道导弹:如果属实,则“极其严重”

法国就乌克兰发射洲际弹道导弹:如果属实,则“极其严重”

桂系007
2024-11-21 23:59:02
中国恢复对日免签,日本反应热烈

中国恢复对日免签,日本反应热烈

参考消息
2024-11-23 18:10:12
朝军首次参战出现伤亡,乌军缴获朝鲜73机枪:但这次朝军收获巨大

朝军首次参战出现伤亡,乌军缴获朝鲜73机枪:但这次朝军收获巨大

现代小青青慕慕
2024-11-24 05:24:46
高通称2024财年从华为产品获40亿元授权费,高管曾喊话“争取双方合作”|硅基世界

高通称2024财年从华为产品获40亿元授权费,高管曾喊话“争取双方合作”|硅基世界

钛媒体APP
2024-11-23 13:43:22
上海品质小区楼里恶臭阵阵,夹层竟藏了上万只蛆

上海品质小区楼里恶臭阵阵,夹层竟藏了上万只蛆

看看新闻Knews
2024-11-23 21:28:53
2轮1分!巴萨噩梦4分钟:染红+丢2球,揪出头号罪人 皇马翻盘良机

2轮1分!巴萨噩梦4分钟:染红+丢2球,揪出头号罪人 皇马翻盘良机

风过乡
2024-11-24 07:03:00
74岁张艺谋看圆明园铜首,穿厚底鞋打扮时髦,何超琼亲自陪同!

74岁张艺谋看圆明园铜首,穿厚底鞋打扮时髦,何超琼亲自陪同!

古希腊掌管月桂的神
2024-11-23 22:11:48
国家队暂停托市!11月24日,A股牛市行情走完了?

国家队暂停托市!11月24日,A股牛市行情走完了?

风口招财猪
2024-11-24 04:02:28
季羡林:没必要感激任何人,养猪,只是为吃猪肉,为了利益而忙碌

季羡林:没必要感激任何人,养猪,只是为吃猪肉,为了利益而忙碌

清风拂心
2024-11-23 16:30:03
灰熊力克公牛:小皮蓬30+10生涯首次 拉文29分武切26分

灰熊力克公牛:小皮蓬30+10生涯首次 拉文29分武切26分

醉卧浮生
2024-11-24 11:20:19
奶奶为除虱子用敌敌畏给孙女洗头,担心效果不佳还用毛巾包头致中毒!

奶奶为除虱子用敌敌畏给孙女洗头,担心效果不佳还用毛巾包头致中毒!

北青网-北京青年报
2024-11-21 16:41:10
2024-11-24 12:32:49
PingCAP
PingCAP
分布式数据库TiDB背后团队
510文章数 624关注度
往期回顾 全部

科技要闻

“这是中国的非凡机遇,德日远远落后了”

头条要闻

广东知名健身机构关门倒闭 有人剩近10万元未消费

头条要闻

广东知名健身机构关门倒闭 有人剩近10万元未消费

体育要闻

德约科维奇携手穆雷 征战新赛季

娱乐要闻

一个月被爆两次,王宝强得罪谁了?

财经要闻

2025年全球股、债、商、汇怎么走?

汽车要闻

尊界S800首张官图发布 双色车身"尊的"很亮

态度原创

房产
游戏
教育
艺术
时尚

房产要闻

丁村迎来大动作!首宗、百亩城更宅地挂出!楼面价2367元/㎡!

《虚冲》12月Steam抢先体验 刀剑动作恐怖冒险

教育要闻

广西一小学生吃零食被老师扇脸、录视频,还要说谢谢,官方发通报

艺术要闻

故宫珍藏的墨迹《十七帖》,比拓本更精良,这才是地道的魏晋写法

中年女人穿衣要讲究,这些实用的冬季穿搭,优雅高级还显贵

无障碍浏览 进入关怀版