分布式数据库的进度管理:TiDB 备份恢复工具 PiTR 的原理与实践

导读

对于一款企业级数据库产品而言,数据的安全性和可恢复性是至关重要的。PiTR(Point in Time Restore)作为 TiDB 备份工具的核心功能之一,提供了一种精细的数据恢复能力,允许用户将数据库集群恢复到过去的任意时间点。这种能力对于处理数据损坏、误操作或数据丢失等灾难性事件至关重要。

对于分布式系统而言,想实现精确的进度管理是十分复杂的,本文将深入解析 PiTR 在 TiDB 的分布式架构中的实现,包括其在 TiKV 层的备份流程,以及 TiDB 如何管理这些备份任务的进度。

希望本文能够帮助开发者和数据库管理员更好地理解 PiTR 的工作机制,有效地利用这一功能加固数据库基础设施。

PiTR 是 TiDB 备份工具中必不可少的一部分。如果说全量备份帮助我们获得了将集群回退到某个时间节点的能力,那么 PiTR 则更加精细地备份了集群的每一次写入,并且允许我们回到备份开始后的任意一个时点。

直觉上,当你启动一个 PiTR 任务,等于告诉集群:我需要知道从当前时间节点之后的全部变化。对于一个分布式数据库而言,这并不是一个简单的工作。

目前 TiDB 的数据存储结构

上图展示了目前 TiDB 的数据存储结构。用户以表和行的形式写入数据,每一行数据都会以一个键值对的形式存储在 TiKV 中,每一个 TiKV 又会被逻辑地划分为多个 region。

由于 TiDB 分布式写入的实质,各个 Region 的数据分布在不同宿主机上,也不存在一个确切统一的写入时点。所以我们需要找到一种方法分别管理每个 region 的写入工作,并且需要提供一个整体进度。在接下来的内容中,我们将详细展开 TiDB 的 PiTR 进度管理流程。从单个 TiKV 开始,逐步推进到整个集群。

TiKV 侧备份流程

如果我们希望管理备份工作的具体进度,首先需要了解的是,备份工作究竟是怎样完成的。在 TiDB 的实践中,PiTR 是一个分布式过程,每个 TiKV Server 自行记录备份数据,并将数据发送到远端储存,大致上按照下图所示的流程工作。

TiKV 侧备份流程

在 TiKV server 初始化期间,会同时(先后)初始化 BackupStreamObserver[1] 和 Endpoint[2] 两个组件。它们共用了同一个 scheduler(backup_stream_scheduler[3],通过向 scheduler 发送 Task 的方式进行互相沟通。

BackupStreamObserver 会实时监听 Raft 状态机的写入情况。其重点在于 on_flush_applied_cmd_batch()[4] 接口。这个接口会在 Raft 状态机 apply 时被调用,将 Raft 命令打包为 BatchEvent,然后作为一个任务发送给 scheduler。对于 PiTR 而言,这个任务被称为 Task::BatchEvent[5]。

pub struct CmdBatch {
    pub level: ObserveLevel,
    pub cdc_id: ObserveId,
    pub rts_id: ObserveId,
    pub pitr_id: ObserveId,
    pub region_id: u64,
    pub cmds: Vec<Cmd>,
}

可以看出,BatchEvent 的实质是一系列 Raft 命令的拷贝。PiTR 在备份时记录这些命令,并在恢复时重放,以实现日志备份功能。

而 Endpoint 负责沟通 TiKV Server 和外部储存。它会在启动之后进入一个循环,检查当前 scheduler 中是否包含新的任务,匹配并执行不同的函数。其中,我们需要关注的是 Task::BatchEvent,也就是从 Observer 发送来的写入数据。当 endpoint 匹配到 Task::BatchEvent,它会执行 backup_batch()[6] 函数开始备份这些键值对。

在这一步,Endpoint 先对 CmdBatch 进行简单检查,然后将它发往router.on_events()[7],并开始异步地等待结果。

Router 的作用是将写入操作按照 range 拆分,以提高并发度。每个 range 的写入并不是即时的,我们会在内存中储存一个临时文件,用于暂时存储从 raft store 更新的信息。当内存中储存的临时文件大小超出上限,或者超过指定刷盘间隔,我们才会真正将储存在临时文件中的数据写入远端储存,并视为完成了一次(部份)备份。目前 BackupStreamConfig 的默认设置中,max_flush_interval 为 3 分钟。

impl Default for BackupStreamConfig {
    fn default() -> Self {
        // ...
        Self {
            min_ts_interval: ReadableDuration::secs(10),
            max_flush_interval: ReadableDuration::minutes(3),
        // ...
        }
    }
}

当满足刷盘条件后,我们会跳转到 endpoint.do_flush() [8] 函数。并在这里完成将备份文件刷盘的逻辑。当这个函数完成之后,备份数据已经被写入远端存储,可以认为备份到此告一段落。此处正是汇报备份进度的最佳时刻。在并不令人注意的角落,这个任务是由一个回调完成的:flush_ob.after() [9]。

       async fn after(&mut self, task: &str, _rts: u64) -> Result<()> {
        let flush_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::FlushWith(
            std::mem::take(&mut self.checkpoints),
        )); //Update checkpoint
        try_send!(self.sched, flush_task);

        let global_checkpoint = self.get_checkpoint(task).await?;
        info!("getting global checkpoint from cache for updating."; "checkpoint" => ?global_checkpoint);
        self.baseline
            .after(task, global_checkpoint.ts.into_inner()) //update safepoint
            .await?;
        Ok(())
    }

这个回调函数做了两件事,更新 service safe point 和 store checkpoint。它们是什么,又有什么用呢?

从检查点(Checkpoint)到全局检查点(Global Checkpoint)

上文中我们阅读了 PiTR 备份流程的细节。现在,我们可以回到正题,反思整个流程。

首先我们已经明确,对于 TiDB 这样的分布式数据库,所有的数据都储存在一个个单独的 TiKV 节点上。在 PiTR 流程中,这些 TiKV 也是各自将数据打包成文件,发送到远端储存上。这引出了一个重要的问题:如何进行进度管理?

为了确保备份进度的有效管理,我们需要跟踪每个 TiKV 节点上的数据备份进度。对于单个 Region,可以通过记录已备份数据的时间戳来实现进度管理:当数据被刷盘时,记录当前时间戳,这个时间戳就是该 Region 完成备份的最小时间节点,即 Checkpoint。

同时,我们需要了解到,需要备份的数据并不会永恒的保留。由于 MVCC 机制,每次数据修改都会产生一个新版本并保留旧版本,旧版本可以用于历史查询和事务隔离。随着时间的推移,这些历史数据会不断累积,因此需要通过 GC 机制来回收和清理旧版本,释放存储空间并提高性能。

我们需要确保在备份(Flush)完成之前,备份数据不会被 GC 清除。所以此处引入一个指标,通知 GC 可以安全清除的数据时间戳。这就是Service Safepoint。

值得注意的是,以上的讨论只是单个 region 的进度管理,一个集群中会同时存在多个 region,所以我们需要设计一个指标便于管理整个集群的备份进度,它被称之为Global Checkpoint。

在实践中,Global Checkpoint 是所有 TiKV Checkpoint 的最小值[10],这保证了所有 region 的进度都至少不小于这个时间节点。或者说,在这个时间节点之前,整个集群的数据都完成备份了。

而这个汇总所有 TiKV 进度并计算 Global Checkpoint 的工作,是在 TiDB 完成的。

TiDB 侧进度管理

TiDB 侧进度管理

既然我们了解了 TiKV 侧的备份进度管理流程。让我们转头看看 TiDB 的情况。

在 TiDB 侧,负责这项工作的组件被称为 CheckpointAdvancer [11]。它的本质是一个外挂在 TiDB 主程序上的守护进程,会随着时间执行一些周期性操作。它的工作主要包括两部分:

  1. 订阅更新来自 TiKV 的 FlushTSO 更新。
  2. 处理可能的错误并计算 Global Checkpoint。
  3. 计算总体更新进度并汇报给 PD。

具体地,在 CheckPointAdvancer 中有一个名为 FlushSubscriber[12] 的字段,TiDB 就是通过它监听 TiKV 的刷盘操作和 checkpoint 推进。FlushSubscriber 维持一个 gRPC 流,持续监听[13] 不同 range 的 checkpoint 并将其记录下来。随后通过 channel 发送给 advancer。

而 advacner 接收到这些 checkpoint 之后,会将它们放置于 checkpoints[14] 字段中。当接收到来自 TiKV 的进度信息之后,advancer 会尝试开始更新 Global Checkpoint。作为一个守护进程,更新过程并不是实时的,而是随着主进程调用它的 tick()[15] 方法间歇性完成。

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
        //...        
        var errs error
        cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
        defer cancel()
        err := c.optionalTick(cx)
        if err != nil {
            // ...  
        }

        err = c.importantTick(ctx)
        if err != nil {
            // ...
        }

        return errs
}

这个过程实际上被分为了两个部分,optionalTick()[16] 和 importantTick() [17]。

optionalTick 主要负责与 FlushSubscriber 沟通,获取来自 TiKV 的进度更新。由于单个 TiKV 的 Checkpoint 并不一定会推进,所以取名为 optionalTick。一旦捕获到 TiKV FlushTSO 的更新,便会在这里记录并试图推进全局检查点。

而 importantTick 则负责管理全局进度。确认进度更新后,这里会产生新的 Global Checkpoint 和 Service Safepoint[18]。

这个行为是存在风险的。如果某个 TiKV 的 Checkpoint 因为种种原因一直没有成功推进,就会阻塞住 Global Checkpoint 的推进,进而可能阻塞住 GC,无法正确清除已经完成备份的冗余数据。在最糟糕的情况下,某个 TiKV 陷入了不可自动恢复的错误。它有可能会永远阻碍 GC 进度,造成对整体系统的更大破坏。

因此,importantTick 会检查[19] checkpoint 距离上次更新的时间差。如果某个 Checkpoint 长时间没有推进,这个备份任务会被标记为异常状态[20]。随后,advancer 会自动暂停这个任务,等待管理员手工运维的介入。

        isLagged, err := c.isCheckpointLagged(ctx)
        if err != nil {
                return errors.Annotate(err, "failed to check timestamp")
        }
        if isLagged {
                err := c.env.PauseTask(ctx, c.task.Name)
                if err != nil {
                        return errors.Annotate(err, "failed to pause task")
                }
                return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large")
        }

此后,advancer 并不会停止,它只是跳过 [21] 了异常任务的 checkpoint 更新。如果 PD 恢复了这个任务,会向 advancer 发送信号[22],advancer 便可以回到正常的 tick 流程中。

此处介绍的异常处理机制是完全防卫性质的。它只能识别异常状态的存在,却无法指出问题的原因,最终还需要管理员手动介入。或许在未来,我们能够实现 PiTR 的自动运维,当 checkpoint 恢复推进之后,可以自动重启这个任务。

参考资料

[1]

BackupStreamObserver: https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/observer.rs#L94

[2]

Endpoint: https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/endpoint.rs#L1414

[3]

backup_stream_scheduler: https://github.com/tikv/tikv/blob/release-8.0/components/server/src/server.rs#L891

[4]

on_flush_applied_cmd_batch(): https://github.com/tikv/tikv/blob/release-8.0/components/raftstore/src/coprocessor/mod.rs#L581

[5]

Task::BatchEvent: https://github.com/tikv/tikv/blob/release-8.0/components/raftstore/src/coprocessor/mod.rs#L504

[6]

backup_batch(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/endpoint.rs#L479

[7]

router.on_events(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/router.rs#L595

[8]

endpoint.do_flush(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/endpoint.rs#L825

[9]

flush_ob.after(): https://github.com/tikv/tikv/blob/release-8.0/components/backup-stream/src/checkpoint_manager.rs#L526

[10]

最小值: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/advancer.go#L295

[11]

CheckpointAdvancer: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L57

[12]

FlushSubscriber: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/flush_subscriber.go#L27

[13]

持续监听: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/flush_subscriber.go#L250

[14]

checkpoints: https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/advancer.go#L56

[15]

tick(): https://github.com/pingcap/tidb/blob/release-8.0/br/pkg/streamhelper/advancer.go#L645

[16]

optionalTick(): https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L622

[17]

importantTick(): https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L587

[18]

Service Safepoint: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L605

[19]

检查: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L594

[20]

标记为异常状态: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L598

[21]

跳过: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L646

[22]

发送信号: https://github.com/pingcap/tidb/blob/master/br/pkg/streamhelper/advancer.go#L465

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/890236.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

1.MySQL存储过程基础(1/10)

引言 数据库管理系统&#xff08;Database Management System, DBMS&#xff09;是现代信息技术中不可或缺的一部分。它提供了一种系统化的方法来创建、检索、更新和管理数据。DBMS的重要性体现在以下几个方面&#xff1a; 数据组织&#xff1a;DBMS 允许数据以结构化的方式存…

8.优化存储过程的性能(8/10)

优化存储过程的性能 1.引言 存储过程是数据库系统中预先编写好的SQL语句集合&#xff0c;它们被保存在数据库服务器上&#xff0c;可以在需要时被调用执行。存储过程的使用可以提高数据库操作的效率&#xff0c;减少网络通信&#xff0c;并且可以封装复杂的逻辑&#xff0c;使…

无人机之交互系统篇

一、系统构成 无人机交互系统通常由多个子系统组成&#xff0c;包括但不限于&#xff1a; 多模式人机交互装置&#xff1a;这是人机交互系统的基础层&#xff0c;通常包括计算机、局域网、传感器等设备&#xff0c;用于实现操作员与无人机之间的数据交互和指令传递。例如&…

新型物联网电力数据采集器 智能网关通讯协议有哪些?

随着智能化技术的快速发展&#xff0c;电气监测与管理在各个域的应用愈发重要&#xff0c;在物联网&#xff08;IoT&#xff09;应用的发展中&#xff0c;网关扮演着至关重要的角色。它作为连接设备与云平台或数据中心的桥梁&#xff0c;负责数据的收集、处理和传输。网关不仅支…

鸿蒙开发之ArkUI 界面篇 三十四 容器组件Tabs一

当页面较多时&#xff0c;可以通过Tabs组件进行展示&#xff0c;如下图&#xff0c;支持顶部、底部、侧边栏 Tabs页面需两个组件&#xff0c;分别是TabContent和TabBar。TabContent必须有&#xff0c;TabBar是导航标题&#xff0c;可以没有也能显示&#xff0c;只是没有标题提示…

uniapp__微信小程序使用秋云ucharts折线图双轴

1、子组件 <template><view class"charts-box"><qiun-data-charts type"line":opts"computedOpts":chartData"chartData"/></view> </template><script> export default {props: {chartData: {t…

【论文阅读】Learning a Few-shot Embedding Model with Contrastive Learning

使用对比学习来学习小样本嵌入模型 引用&#xff1a;Liu, Chen, et al. “Learning a few-shot embedding model with contrastive learning.” Proceedings of the AAAI conference on artificial intelligence. Vol. 35. No. 10. 2021. 论文地址&#xff1a;下载地址 论文代码…

沉浸式娱乐新纪元,什么是5G+实时云渲染VR大空间解决方案?

近年来&#xff0c;虚拟现实&#xff08;VR&#xff09;技术在娱乐、教育、医疗等多个领域展现出巨大的潜力&#xff0c;尤其是VR大空间体验&#xff0c;更是以其沉浸式和互动性的特点&#xff0c;迅速成为市场的新宠。据Statista数据显示&#xff0c;2023年&#xff0c;全球虚…

【笔记】Day2.5.1查询运费模板列表(未完

&#xff08;一&#xff09;代码编写 1.阅读需求&#xff0c;确保理解其中的每一个要素&#xff1a; 获取全部运费模板&#xff1a;这意味着我需要从数据库中查询所有运费模板数据。按创建时间倒序排序&#xff1a;这意味着查询结果需要根据模板的创建时间进行排序&#xff0…

【Java】集合中单列集合详解(一):Collection与List

目录 引言 一、Collection接口 1.1 主要方法 1.1.1 添加元素 1.1.2 删除元素 1.1.3 清空元素 1.1.4 判断元素是否存在 1.1.5 判断是否为空 1.1.6 求取元素个数 1.2 遍历方法 1.2.1 迭代器遍历 1.2.2 增强for遍历 1.2.3 Lambda表达式遍历 1.2.4 应用场景 二、…

【D3.js in Action 3 精译_029】3.5 给 D3 条形图加注图表标签(上)

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一部分 D3.js 基础知识 第一章 D3.js 简介&#xff08;已完结&#xff09; 1.1 何为 D3.js&#xff1f;1.2 D3 生态系统——入门须知1.3 数据可视化最佳实践&#xff08;上&#xff09;1.3 数据可…

aws(学习笔记第二课) AWS SDK(node js)

aws(学习笔记第二课) 使用AWS SDK&#xff08;node js&#xff09; 学习内容&#xff1a; 使用AWS SDK&#xff08;node js&#xff09; 1. AWS SDK&#xff08;node js&#xff09; AWS支持多种SDK开发(除了AWS CLI&#xff0c;还支持其他的SDK) AndroidPythonNode.js(Javas…

数据结构-C语言顺序栈功能实现

栈 栈&#xff1a;类似于一个容器&#xff0c;如我们生活中的箱子&#xff0c;我们向箱子里放东西&#xff0c;那么最先放的东西是最后才能拿出来的 代码实现 #include <stdio.h> #include <stdlib.h>#define MAX_SIZE 100typedef struct {int* base; // 栈底指针…

[Linux#65][TCP] 详解 延迟应答 | 捎带应答 | 流量控制 | 拥塞控制

目录 一、延迟应答 二、捎带应答 三. 流量控制 总结 四. 拥塞控制 1. 拥塞控制 2. 慢启动机制&#xff1a; 3.思考 4.拥塞避免算法 5. 快速恢复算法 一、延迟应答 1. 立即应答问题 接收数据的主机若立刻返回ACK应答&#xff0c;可能返回的窗口较小。例如&#xff1…

univer实现excel协同

快速入门 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><script src&q…

怎么看待数字化转型是大势所趋?

怎么看到数字化转型是大势所趋&#xff1f;下面我结合最新数据给大家讲明白这个事。 近日&#xff0c;我通过大量的数据相关性分析&#xff0c;有了一些关键发现。 【数字化转型】之所以势在必行&#xff0c;主要是因为数字化转型为各个国家数字经济发展提供了重要的参考依据。…

使用js和canvas实现简单的网页贪吃蛇小游戏

玩法介绍 点击开始游戏后&#xff0c;使用键盘上的↑↓←→控制移动&#xff0c;吃到食物增加长度&#xff0c;碰到墙壁或碰到自身就游戏结束 代码实现 代码比较简单&#xff0c;直接阅读注释即可&#xff0c;复制即用 <!DOCTYPE html> <html lang"en"…

SAP将假脱机(Spool requests)内容转换为PDF文档[RSTXPDFT4]

将假脱机(Spool requests)内容转换为PDF文档[RSTXPDFT4] 有时需要将Spool中的内容导出成PDF文件&#xff0c;sap提供了一个标准程序RSTXPDFT4可以实现此功能。 1, Tcode:SP01, 进入spool requests list 2, SE38 运行程序RSTXPDFT4 输入spool reqeust号码18680&#xff0c;然后…

im架构分享 即时通讯架构 即时消息 全球架构师峰会im分享 im分布式 企业级im架构 分布式im 即时通讯im架构

1. 之前收藏的淘宝李厉岷老师在全球架构师峰会上做的im技术分享&#xff0c;贴出来备注下。 2. 李老师infoQ主页链接&#xff1a; 李历岷 3. 文章&#xff1a; 电商IM消息系统架构演进_ArchSummit_李历岷_InfoQ精选文章 4. ppt下载地址 &#xff08;注&#xff1a;同期还有…

GAN(Generative Adversarial Nets)

GAN(Generative Adversarial Nets) 引言 GAN由Ian J. Goodfellow等人提出&#xff0c;是Ian J. Goodfellow的代表作之一&#xff0c;他还出版了大家耳熟能详的花书&#xff08;Deep Learning深度学习&#xff09;&#xff0c;GAN主要的思想是同时训练两个模型&#xff0c;生成…