💻 Computer Basics
1、计算机网络
1.1 传输层:TCP和UDP
1.1.1 三次握手
1.1.2 四次挥手
1.1.3 流量控制
1.1.4 拥塞控制
1.1.5 TCP和UDP的区别
1.1.6 TCP如何保证传输的可靠性
1.1.7 TCP长连接和短连接
1.1.8 应用层提高UDP协议可靠性的方法
1.1.9 UDP和IP的首部结构
1.2 应用层:HTTP和HTTPS
1.2.1 HTTP和HTTPS的区别
1.2.2 GET和POST的区别
1.2.3 Session与Cookie的区别
1.2.4 从输入网址到获得页面的过程(越详细越好)
1.2.5 HTTP请求有哪些常见的状态码
1.2.6 什么是RIP,算法是什么
1.2.7 HTTP1.1和HTTP2.0的主要区别
1.2.8 DNS
1.2.9 HTTPS加密和认证过程
1.2.10 常见网络攻击
1.2.11 REST
1.3 计算机网络体系结构
1.4 网络层协议
1.4.1 IP地址的分类
1.4.2 划分子网
1.4.3 什么是ARP协议
1.4.4 NAT协议
2、操作系统
2.1 进程和线程
2.1.1 进程和线程的区别
2.1.2 进程间通信方式
2.1.3 同步原语
2.1.4 进程状态
2.1.5 进程调度策略
2.1.6 僵尸进程和孤儿进程
2.1.7 协程
2.1.8 异常控制流
2.1.9 IO多路复用
2.1.10 用户态和内核态
2.2 死锁
2.3 内存管理
2.3.1 分段与分页
2.3.2 虚拟内存
2.3.3 页面置换算法
2.3.4 局部性原理
2.3.5 缓冲区溢出
2.4 磁盘调度
-
+
tourist
register
Sign in
同步原语
## 为什么需要同步 1. 近年来,CPU 的核数在不断的增加,为了更充分地利用多核,应用程序需要**将待处理的数据进行划分**,从而能够**在同一时间分配任务到多个核心上并行处理**,**利用更短的时间完成计算**。 2. **为了保证共享资源状态的正确性**,**需要正确地在这些子任务之间进行同步**,具体实例如下: ![image-20220714162846345](https://notebook.ricear.com/media/202207/2022-07-14_162915_3805240.8343934082667143.png) 1. 假设我们有一个生产者与一个消费者,生产者与消费者共享一个大小为 5 的缓冲区用于交换数据,生产者不断地将生成的新数据放到缓冲区中,与此同时,消费者从缓冲区拿取并消耗这些数据。 2. 如上图所示,缓冲区 0、1、2 分别存放着生产者放入缓冲区的数据,生产者下一步产生的数据将放入 3 中,而消费者将从 0 中拿取数据进行处理。 3. 为了维护生产者和消费者的操作的正确性,我们需要满足以下两个前提条件: 1. **当缓冲区已满时**,**生产者应停止向缓冲区写入数据**,**避免数据被覆盖**。 2. **当缓冲区为空时**,**消费者应停止从缓冲区拿取数据**,**避免读取到无效数据**。 3. 为了正确、高效地解决这些同步问题,前人抽象出了一系列**同步原语**供开发者使用,下面将结合具体的场景介绍几种常见的同步原语的语义、使用方法、实现以及带来的问题。 ## 常见的同步原语 ### ⚔️ 互斥锁 #### 什么是临界区问题 1. 对于一个生产者一个消费者协同工作的问题,我们可以通过上面的两个前提条件来解决,但对于多个生产者同时将新产生的对象放入缓冲区的问题,则显得有些力不从心,具体实例如下: 1. 现在有两个生产者,分别为 $T_1$ 和 $T_2$,这两个生产者同时向缓冲区中放入新产生的数据,假设此时缓冲区空余位置为 2。 2. 此时如果不加干预,两个生产者在同一时刻尝试向缓冲区的 3 号位置来放入自己的数据,最终可能导致数据覆盖,出现错误。 ![image-20220714202636532](https://notebook.ricear.com/media/202207/2022-07-14_202705_8782440.9522137010941577.png) 2. 上述问题产生的根本原因在于对共享资源的竞争(本例中为共享缓冲区),**避免这一问题最直接的方法是确保同一时刻只有一个生产者能够对共享缓冲区进行操作**,这种**任意时刻最多只允许一个线程访问的方式被称为互斥访问**(Mutual Exclusion),而**保证互斥访问共享资源的代码区域被称为临界区**(Critical Section)。 3. **在同一时刻**,**至多只有一个线程可以执行临界区中的代码**,因此,**在临界区内**,**一个线程可以安全地对共享资源进行操作**。 4. 因此,如何通过设计协议来保证互斥访问临界区的问题就称为**临界区问题**。 #### 如何解决临界区问题 1. 为了解决临界区问题,我们需要设计一个协议来保证临界区的互斥性: 1. 我们可以将使用临界区的程序抽象成一个循环,并将这个循环分成四个部分: ![image-20220714211608809](https://notebook.ricear.com/media/202207/2022-07-14_211637_8823940.32907177620882644.png) 1. **申请进入临界区**:在执行临界区之前,线程必须申请并得到进入临界区的许可。 2. **临界区部分**。 3. **标示退出临界区**:在退出临界区之前,线程需要显式地标示临界区的结束。 4. **其他代码**:临界区之外的代码不涉及对共享资源的访问,这些代码被归为其他代码。 2. 四个部分中最为关键的是申请并得到进入临界区的许可与标示退出临界区这两个部分,我们需要针对他们设计一套满足以下条件的算法来保证程序的正确性: 1. **互斥访问**:在同一时刻,最多只有一个线程可以执行临界区。 2. **有限等待**:当一个线程申请进入临界区之后,必须在有限的时间内获得许可并进入临界区,不能无限等待。 3. **空闲让进**:当没有线程在执行临界区代码时,必须在申请进入临界区的线程中选择一个线程,允许其执行临界区代码,保证程序执行的进展。 #### 解决临界区问题的具体实现 ##### 硬件实现:关闭中断 1. 对于**单核**环境来说,我们可以通过**关闭中断**来解决临界区问题: 1. 在单核中关闭中断意味着**当前执行的线程不能被其他线程抢占**。 2. 因此若**在进入临界区之前关闭中断**,且**在离开临界区时再开启中断**,便能够避免当前执行临界区代码的线程被打断,保证任意时刻只有一个线程执行临界区。 2. 同时,在单核环境中,关闭中断满足解决临界区问题的三个条件: 1. 关闭中断可以**防止执行临界区的线程被抢占**,**避免多个线程同时执行临界区**,保证了**互斥访问**。 2. **有限等待依赖于内核中的调度器**,如果能保证**在有限时间内调度到该线程**,则该线程就可以**在有限时间内进入临界区**,达成有限等待的要求。 3. 每个线程在**离开临界区时都开启了中断**,**允许调度器调度到其他线程并执行**,因此某线程在执行时如需要进入临界区(意味着没有其他线程在执行临界区),可以关闭中断并执行临界区代码,从而达成了**空闲让进**的要求。 > 在多核环境中,关闭中断的方法不再适用,因为即使关闭了所有核心的中断,也**不能阻塞其他核心上正在运行的线程继续执行**,如果**多个同时运行的线程需要执行临界区**,关闭中断还是会出现临界区问题。 ##### 软件实现:皮特森算法 1. [皮特森算法](https://en.wikipedia.org/wiki/Peterson%27s_algorithm)是一种纯软件方法,可以解决**两个线程**的临界区问题,可以被用于多核 CPU 中。 2. 皮特森算法的具体实现原理如下: ![image-20220714214523910](https://notebook.ricear.com/media/202207/2022-07-14_214553_2614940.1897861751146217.png) 1. 皮特森算法中有两个重要的变量: 1. 第一个是全局数组 `flag`,他共有两个布尔成员(`flag[0]` 与 `flag[1]`),分别代表线程 0 与线程 1**是否尝试进入临界区**。 2. 第二个是全局变量 `turn`,如果两个线程都申请进入临界区,那么 `turn` 将**决定最终能进入临界区的线程编号**(因此其值只能为 0 或 1)。 2. 在应用程序开始时,`flag` 数组中的成员均被设置为 `FALSE`,表示**两个线程都没有申请进入临界区**,而变量 `turn` 无须进行初始化,其会**在线程申请进入临界区时被设置**。 3. 在皮特森算法中,线程在进入临界区之前必须达成以下**两个条件之一**(以线程 0 为例): 1. `flag[1] = FALSE`,即**线程 1 没有申请进入临界区**。 2. `turn = 0`,即**线程 1 也申请进入临界区**,**但 `turn` 决定线程 0 可以进入临界区**。 如果以上两个条件均不满足,则需要进行**循环等待**,**直到其中一个条件得到满足**。 4. 线程 0 在进入临界区之前,需要设置 `flag[0]` 为 `TRUE`,代表其尝试进入临界区,而在线程 0 检查是否可以进入临界区时**线程 1**只可能处于以下三种状态之一: 1. **准备进入临界区**(执行完第 2 行代码): 1. 由于线程 1 已经执行完了第 2 行代码,即标记 `flag[1]` 为 `TRUE`,此时两个线程均在申请进入临界区,根据之前的说明,这两个线程将依据变量 `turn` 的值从二者中选出一个先进入临界区,而另一个将继续循环等待(**互斥访问**)。 2. 这里线程都会将 `turn`**设置为对方而不是自己**,否则会出现两个线程同时进入临界区的情况,具体而言,线程 1 此时将 `turn` 设置为 0 而非 1,这样可以保证在线程 0 同时申请进入临界区时,线程 1 不能直接进入临界区,无论执行的先后顺序如何,`turn` 最终都会选择一个线程进入临界区(**空闲让进**)。 > 比如,若线程 0**先**执行了 `turn = 1` 的操作,而线程 1**后**执行了 `turn = 0` 的操作,那么 `turn` 的最终值为 0,此时线程 0 可以进入临界区,反之,线程 0 后执行更新操作时,线程 1 可以进入临界区,不会存在线程 0 与线程 1 都无法进入临界区的情况。 2. **在临界区内部**: 1. 此时由于线程 1 在临界区内,他不会再更新 `flag[1]` 与 `turn`,而由于线程 0 只能将 `turn` 值设 1,满足了线程 0**循环等待**的条件,保证了同一时刻只有一个线程执行临界区内的代码(互斥访问)。 3. **执行其他代码**: 1. 线程 1 在离开临界区时会设置自己的 `flag[1]` 为 `FALSE`,表示不在占有临界区,因此线程 0 可以进入临界区。 2. 如果线程 1 在线程 0 成功进入临界区之前又再次尝试进入临界区(第 2、3 行),此时由于线程 0 已经进入检查阶段(第 4 行),不会再更新 `turn` 的值,因此 `turn` 的值会被线程 1 更新为 0,表示线程 0 被允许进入临界区,不会出现无限等待的情况(**有限等待**)。 > 通过分析皮特森算法在不同执行情况下的行为,可以发现皮特森算法**满足之前定义的解决临界区问题的三个条件**。 3. 皮特森算法存在的缺陷如下: 1. 经典的皮特森算法只能应对**两个线程**的情况,而 Micha Hofri 于 1990 年[扩展了皮特森算法](https://doi.org/10.1145/90994.91002),使其能被使用于任意数量的线程。 2. 皮特森算法要求**访存操作严格按照程序顺序执行**,然而现代 CPU 为了达到更好的性能往往允许访存操作乱序执行,因此皮特森算法**无法直接在这些 CPU 上正确工作**。 ##### :atom_symbol: 软硬件协同:原子操作 ###### 什么是原子操作 1. 原子操作指的是 **不可被打断** 的一个或一系列操作,即要么这一系列指令都执行完成,要么这一系列指令一条都没有执行,不会出现执行到一般的状态。 2. 最常见的原子操作包括 [比较与交换](https://notebook.ricear.com/project-34/doc-529)(Compare And Swap, CAS)、**拿取并累加**(Fetch And Add, FAA)等: 1. **CAS**: 1. CAS 操作会比较地址 `addr` 上的值与期望值 `expected` 是否相等,如果相等则将 `addr` 上的值置换为新的值 `new_value`,否则不进行置换,最后 CAS 将 **返回 `addr` 存放的旧值**。 ~~~c++ int CAS(int *addr, int expected, int new_value) { int tmp = *addr; if (*addr == expected) {*addr = new_value;} return tmp; } ~~~ 2. **FAA**: 1. FAA 操作会读取 `addr` 上的旧值,将其加上 `add_value` 后重新存回该地址,最后 **返回该地址上的旧值**。 ~~~c++ int FAA(int *addr, int add_value) { int tmp = *addr; *addr = tmp + add_value; return tmp; } ~~~ > 以上代码只是用来展现 CAS 和 FAA 的操作逻辑,代码本身并不具备原子性。 ###### 硬件平台实现 > 为了在硬件层面支持这些原子操作,不同的硬件平台提供了不同的解决方案。 1. **Intel**: 1. 在 Intel 平台,应用程序主要通过使用 **带 `lock` 前缀的指令** 来保证操作的原子性。 2. 下面的 :point_down: 代码片段展示了在 Intel x86_64 平台上使用内联汇编实现原子 CAS 操作的代码。 ~~~c++ int atomic_CAS(int *addr, int expected, int new_value) { asm volatile("lock cmpxchg %[new], %[ptr]" : "+a" (expected), [ptr] "+m" (*addr) : [new] "r" (new_value) : "memory"); return expected; } ~~~ > 1. `cmpxchg` 指令用于 **比较并交换指定的地址与值**,他先比较地址 `addr`(对应于 `%[ptr]`)中存储的值与寄存器 `%eax` (即 `expected`)中,否则将 `addr` 中存储的数据读入寄存器 `%eax` (即 `expected`)中。 > 2. 通过这条指令加上 **`lock` 前缀**,Intel 处理器可以保证上述比较与交换操作的原子性。 2. **ARM**: 1. 不同于 Intel 处理器的实现,ARM 则采用了 [Load-Link/Store-Conditional](https://en.wikipedia.org/wiki/Load-link/store-conditional)(LL/SC)的指令组合。 2. 在 Load-Link 时,CPU 将使用一个专门的 **监视器** 记录**当前访问的地址**,而在 Store-Conditional 时,当且仅当 **监视的地址没有被其他和修改**,才执行存储操作,否则将返回存储失败。 3. 使用 LL/SC 可以方便地实现之前介绍的两种原子操作,下面将以 **CAS** 为例介绍一下具体原理: 1. 在 **Load-Link** 阶段,该指令将告诉硬件,使用监视器 **监视 `addr` 这个地址**,而当发现 **`addr` 中存储的值与 `expected` 相等**,因而需要 **更新 `addr` 中存储的值** 时,我们 **使用 Store-Conditional 来确保此期间没有其他的核心更新了 `addr` 中存储的值**。 2. 具体的代码如下 :point_down:。 ~~~c++ int atomic_CAS(int *addr, int expected, int new_value) { int oldval, ret; asm volatile( "1: ldxr %w0, %2\n" " cmp %w0, %w3\n" " b.ne 2f\n" " stxr %w1, %w4, %2\n" " cbnz %w1, 1b\n" "2:" : "=&r" (oldval), "=&r" (ret), "+Q" (*addr) : "r" (expected), "r" (new_value) : "memory"); return oldval; } ~~~ 1. `ldxr` 与 `stxr` 分别对应于 Load-Link 操作与 Store-Conditional 操作。 2. 代码中第 5 行利用 `ldxr` 读取 `addr`(对应于 `%2`)的值存入 `oldval`(对应于 `%w0`)中,同时监视 `addr` 这个地址。 3. 第 6 行比较 `oldval` 与 `expected`(对应于 `%w3`)是否相等: 1. 如果不相等,则在第 7 行跳到标记 2 处(第 10 行)结束该函数,并返回旧值。 2. 如果相等,在第 8 行利用 `stxr` 操作将 `new_value`(对应于 `%w4`)存储到地址 `addr` 中,如果有其他核心修改 `addr` 中的值导致存储失败,其会通过设置寄存器 `%w1` 为 1 来告知。 4. 第 9 行通过检查寄存器 `%w1` 的值来判断是否失败,如果失败,则回到标记 1 处(第 5 行)重新尝试原子 CAS。 ###### :lock: 互斥锁抽象 1. 利用硬件保证的原子操作,我们可以实现 **互斥锁** 来解决临界区问题。 2. 互斥锁的抽象简单易用,**一把锁在同一时刻只能被一个线程所拥有**,一旦一个线程获取了锁,其他的线程均不能同时拥有该锁,只能等待锁被释放。 3. 互斥锁提供了 `lock` 与 `unlock` 操作,分别对应于 **加锁**:lock: 与 **解锁** :key:: 1. 为了保证临界区的互斥访问,我们可以在 **申请进入临界区** 时尝试获取 **共享的一把锁**,只有拥有锁的线程才能被允许执行临界区的代码。 2. 在 **退出临界区** 时,线程将释放锁,从而允许其他线程拥有锁并进入临界区。 > 如果将临界区比作一个 **房间**:door:,那么每个线程在进入临界区时,都会将房门 **反锁**,其他线程因此无法进入,只有等待进入房间的线程 **解开反锁**,打开房门时,下一个线程才有机会进入临界区。 4. 互斥锁的实现种类繁多,不同的互斥锁被用于不同的场景,以达到最好的性能表现,下面将介绍两种不同的互斥锁,分别为利用原子 CAS 实现的 **自旋锁** 与利用 FAA 实现的 **排号自旋锁**: 1. **自旋锁** ⭕: 1. 自旋锁利用一个变量 `lock` 来表示 **锁的状态**: 1. `lock` 为 1 表示 **已经有人拿锁**。 2. `lock` 为 0 表示 **锁空闲**。 2. 自旋锁的加锁操作是通过使用原子的 CAS 操作实现的: 1. 在 **加锁** 时,线程会通过 CAS 判断 `lock` **是否空闲**,如果 **空闲则上锁**,否则将一遍一遍重试。 2. 在 **解锁** 时,直接将 `lock` **设置为 0 代表其空闲**。 > 由于大部分 64 位 CPU 对于地址对齐的 64 位单一写操作都是原子的,因此这里不需要使用额外的硬件指令保护写操作的原子性。 3. 自旋锁的 `C` 语言实现代码如下 :point_down:。 ~~~c++ void lock_init(int *lock) { *lock = 0; // 初始化自旋锁 } void lock(int *lock) { while (atomic_CAS(lock, 0, 1) != 0) ; // 循环忙等 } void unlock(int *lock) { *lock = 0; } ~~~ 2. 4. 下面分析一下自旋锁是否满足解决临界区问题的三个条件: 1. 申请进入临界区的线程会尝试将 `lock` 的值从 0 改为 1,但是由于我们使用了原子的 CAS 操作,因此当锁空闲时,多个竞争者中 **只会有一个成功完成 CAS 操作并获取锁**,从而进入临界区(**互斥访问** :white_check_mark: 与 **空闲让进** :white_check_mark:)。 2. 当锁已经被持有时,后续的线程都将无法通过 CAS 操作获取锁,从而无法进入临界区(**互斥访问** :white_check_mark:)。 3. 然而,自旋锁并 **不能保证有限等待** ❌,即 **自旋锁不具有公平性**: 1. 从自旋锁的实现可以看出,自旋锁并非按照申请的顺序决定下一个获取锁的竞争者,而是让所有的竞争者均同时尝试完成原子操作。 2. 而原子操作的成功与否完全取决于 **硬件特性**,比如,在一些 **异构** 环境下(如 ARM 移动 CPU 中的大小核架构),小核由于运行频率低,在与大核竞争时成功完成原子操作的概率远远小于大核。 3. 因此运行在小核上的竞争者可能永远也无法获取锁,从而出现不公平的情况。 4. 不过,由于自旋锁 **实现简单**,在 **竞争程度低** 时非常 **高效**,因此他依然广泛地应用在各种软件之中。 3. **排号自旋锁** 🧑🤝🧑: 1. 不同于前面介绍的自旋锁,排号自旋锁(简称为 **排号锁**)采取了一种更加 **公平** 的选择策略,按照锁竞争者 **申请锁的顺序** 传递锁。 2. 与现实生活中取号排队相似,排号锁的竞争者将依照申请的先后顺序,分得一个 **排队的序号**,排号锁将依照序号,将锁传递给最先到达的竞争者。 3. 因此可以认为,锁的竞争者组成了一个 **先进先出** 的等待队列。 4. 排号锁的 `C` 语言实现代码如下 :point_down:: ~~~c++ struct lock { volatile int owner; volatile int next; } void lock_init(struct lock *lock) { // 初始化排号锁 lock->owner = 0; lock->next = 0; } void lock(struct lock *lock) { // 拿取自己的序号 volatile int my_ticket = atomic_FAA(&lock->next, 1); while(lock-owner != my_ticket) ; // 循环忙等 } void unlock(struct lock *lock) { // 传递给下一位竞争者 lock->owner++; } ~~~ 1. 排号锁的结构体有两个成员,其中 `owner` 表示当前的锁持有者序号,`next` 表示下一个需要分发的序号。 2. **获取排号锁** 需要先通过原子的 `atomic_FAA` 操作 **拿到最新的序号** 并同时 **增加锁的分发序号**,来 **避免其他竞争者拿到相同的序号**。 3. **拿到序号后**,竞争者将通过判断 `owner` 的值,等待排到自己的序号,一旦两者 **相等**,竞争者 **拥有该锁并被允许进入临界区**。 4. 在 **释放锁** 时,锁持有者通过更新 `owner` 的值来 **将锁传递给下一个竞争者**,由于锁的传递顺序是按照 **申请锁的顺序** 而定的,因此该锁 **保证了获取锁的公平性**,且锁的竞争者均会 **在有限时间内拿到锁并进入临界区**(**有限等待**)。 5. 虽然互斥锁能保证在多核环境下对共享资源操作的正确性,但在实际场景中,对于同步的需求多种多样,仅仅依靠互斥是远远不够的 :sob: ,后面将介绍一些其他基于互斥锁的同步原语,他们为开发者应对不同场景提供了更加易用的接口 :happy: 。 ### 🫠 条件变量 #### 为什么需要条件变量 1. 在前面列举的生产者消费者例子中,当剩余的空位为 0 时,生产者会陷入 **循环等待**,而在消费者释放空位之前,循环等待都是毫无意义的,不仅浪费了 CPU 资源,还增加了系统的能耗。 2. 因此,我们希望使用一种 **挂起** / **唤醒** 的机制来解决这个问题,**条件变量** 便是为了满足这样的需求而设计的。 3. 通过使用条件变量提供的接口: 1. 一个线程可以 **停止使用 :no_entry_sign: CPU** 并将自己 **挂起** :sleeping: 。 2. 当 **等待的条件满足** 时,**其他线程** 会 **唤醒 ⏰ 该挂起的线程** 让其继续执行。 4. 使用条件变量能够有效避免无谓的循环等待,比如在生产者消费者例子中,使用条件变量可以让生产者等待在『是否还有空位』这个条件上,待到消费者发现已经有新的空位之后再将其唤醒,从而避免生产者无意义的循环等待。 #### 条件变量的使用 1. 条件变量提供了两个接口,`cond_wait` 和 `cond_signal`,分别用于 **挂起当前线程** 与 **唤醒等待在该条件变量上的线程**: 1. `cond_wait`: > 条件变量必须搭配一个 **互斥锁** 一起使用,该互斥锁用于 **保护对条件的判断与修改**。 1. `cond_wait` 接收两个参数,分别为 **条件变量** 与 **搭配使用的互斥锁**(在临界区中)。 2. `cond_wait` 在 **挂起** 当前线程的同时,该互斥锁会被原子地 **释放**。 3. 此时,其他线程则有机会进入临界区,满足该线程的等待条件,然后利用 `cond_signal` **唤醒** 该线程。 4. 挂起的线程被唤醒之后,其会在 `cond_wait` 返回之前 **重新获取互斥锁**。 2. `cond_signal`: 1. `cond_signal` 只接收一个参数,**当前使用的条件变量**。 2. 虽然 `cond_signal` 没有明确要求必须持有搭配的互斥锁(在临界区中),但在临界区外使用 `cond_signal` 需要非常小心才能保证不丢失唤醒。 2. 下面以生产者消费者为例介绍条件变量的具体使用方法: > 消费者的实现与生产者的实现互为镜像,所以这里主要分析生产者的代码。 1. 使用两个不同的互斥锁 `empty_cnt_lock` 与 `filled_cnt_lock` 来 **保护对共享计数器 `empty_slot` 与 `filled_slot` 的修改**。 > 虽然可以使用原子操作来保证并发下的正确性,但在 **使用条件变量时**,其 **必须搭配互斥锁一起使用**。 2. 针对生产者与消费者等待的条件,这里也创建了两个不同的条件变量(`empty_cond` 与 `filled_cond`),分别对应于『**缓冲区无空位**』和『**缓冲区无数据**』这两个条件。 3. 当生产者发现 **没有空闲位置** 时,就会使用 `cond_wait` 进入阻塞状态,避免循环等待。 > 由于生产者需要等待的条件是『**缓冲区没有满**』,即 `empty_slot` 不为 0,因此这里等待的条件变量为 `empty_cond`。 > > 当有 **新的空位** 产生时,消费者会利用 `cond_signal` 操作唤醒等待在 `empty_cond` 上的生产者。 4. 生产者在 **更新完 `filled_slot` 计数器** 之后,会加入一个 `cond_signal` 操作,用于 **唤醒等待的消费者**,由于消费者等待的条件是『**缓冲区不为空**』,即 `filled_slot` 不为 0,因此这里唤醒的是等待在 `filled_cond` 上的消费者。 ~~~c++ int empty_slot = 5; int filled_slot = 0; struct cond empty_cond; struct lock empty_cnt_lock; struct cond filled_cond; struct lock filled_cnt_lock; void producer(void) { int new_msg; while(TRUE) { new_msg = product_new(); lock(&empty_cnt_lock); while(empty_slot == 0) cond_wait(&empty_cond, &empty_cnt_lock); empty_slot--; unlock(&empty_cnt_lock); buffer_add_safe(new_msg); lock(&filled_cnt_lock); filled_slot++; cond_signal(&filled_cond); unlock(&filled_cnt_lock); } } void consumer(void) { int cur_msg; while(TRUE) { lock(&filled_cnt_lock); while(&filled_slot == 0) cons_wait(&filled_cond, &filled_cnt_lock); filled_slot--; unlock(&filled_cnt_lock); cur_msg = buffer_remove_safe(); lock(&empty_cnt_lock); empty_slot++; cond_signal(&empty_cond); unlock(&empty_cnt_lock); consume_msg(cur_msg); } } ~~~ > :thinking: 这里生产者每次都调用 `cond_signal` 是否有必要,能否减少消费者被唤醒的次数? > > 1. 一种错误 ❌ 的想法是只有在 `filled_slot` 为 0 时才会有消费者等待在该条件变量上,因此只有在这种情况下才需要调用 `cond_signal`。 > 2. :white_check_mark: 然而,在 `filled_slot` 为 0 时,可能会有 **多个消费者等待**,如果此时有多个生产者到达,只在 `filled_slot` 为 0 时调用一次 `cond_signal`,则 **只能唤醒等待的消费者中的一个**,其他消费者将陷入 **无限的等待**。 #### 条件变量的实现 > 条件变量的实现需要考虑并发环境下的正确性。 条件变量的一种实现如下面的 :point_down: 代码所示: ~~~c++ struct cond { struct thread *wait_list; } void cond_wait(struct cond *cond, struct lock *mutex) { list_append(cond->wait_list, thread_self()); atomic_block_unlock(mutex); // 原子挂起并解锁 lock(mutex); // 重新获得互斥锁 } void cond_signal(struct cond *cond) { if(!list_empty(cond->wait_list)) wakeup(list_remove(cond->wait_list)); } void cond_broadcast(struct cond *cond) { while(!list_empty(cond->wait_list)) wakeup(list_remove(cond->wait_list)); } ~~~ 1. 从上面的代码中我们可以看出每个条件变量的结构体中都包含一个 `wait_list`,用于 **记录等待在该条件变量上的线程**。 2. 线程在调用 `cond_wait` **挂起** 自己时,需要完成以下两个步骤: 1. 将当前线程 **加入等待队列**。 2. 原子地 **挂起当前线程** 并 **解锁**,这里使用 `atomic_block_unlock` 操作来原子地完成挂起与解锁操作,而这个操作应当 **由操作系统辅助完成**。 > :thinking: 以上两个步骤中 **第二步** 最为关键:exclamation:,必须保证挂起与解锁 **原子地完成**: > > 1. 如果我们 **先解锁再挂起当前线程**,则有可能存在其他线程在解锁与挂起的间隙调用 `cond_signal` 操作,由于当前线程还未挂起,`cond_signal` 操作无法唤醒该线程,也无法阻拦该线程挂起,最终导致该线程 **挂起后无法被唤醒**,出现错误。 > 2. 如果我们 **先挂起再解锁**,**被挂起的线程便无法成功解锁**,**后续线程也不能进入临界区唤醒挂起的线程**,同样会导致错误。 > :wink: 当线程被其他线程 **唤醒** 之后,其应当 **再次获取锁进入临界区并返回**。 3. 线程在调用 `con_signal` **唤醒** 一个 **等待在条件变量上的线程** 时: 1. 首先判断 **是否有线程等待在条件变量上**,如果存在这样的线程,则利用 **操作系统提供的唤醒服务 `wakeup`** 将该线程唤醒。 > :information_desk_person: 除了 `cond_wait` 与 `cond_signal`,条件变量一般还提供 **广播** :loud_sound: 操作,用于唤醒 **所有** 等待在条件变量上的线程。 > :thinking: 互斥锁与条件变量有什么区别? > > 1. 互斥锁与条件变量解决的不是同一个问题: > 1. 互斥锁用于解决 **临界区问题**,**保证互斥访问共享资源**。 > 2. 条件变量通过提供 **挂起** / **唤醒** 机制来 **避免循环等待**,**节省 CPU 资源**。 > 2. 条件变量需要和互斥锁**搭配使用**。 ### :signal_strength: 信号量 #### 什么是信号量 详见 [System V 信号量](https://notebook.ricear.com/project-26/doc-324/#3-3-2-%E5%90%AB%E4%B9%89)。 #### 信号量的使用 使用信号量来解决生产者消费者问题的代码如下所示 :point_down:: ~~~c++ sem_t empty_slot; sem_t filled_slot; void producer(void) { int new_msg; while(TRUE) { new_msg = produce_new(); wait(&empty_slot); // P buffer_add_safe(new_msg); signal(&filled_slot); // V } } void consumer(void) { int cur_msg; while(TRUE) { wait(&filled_slot); // P cur_msg = buffer_remove_safe(); signal(&empty_slot); // V consume_msg(cur_msg); } } ~~~ 1. 上面的 :point_up_2: 两个信号量分别代表两个 **共享的资源**: 1. `empty_slot` 代表 **空余的缓冲区资源**。 2. `filled_slot` 代表 **已填入内容的缓冲区资源**。 2. 当生产者与消费者均未开始工作且缓冲区中没有填入数据之时,**空余的缓冲区数量等于缓冲区的大小**,**而已填入内容的缓冲区的数量为 0**,因此对于 `empty_slot` 与 `filled_slot` 的初值,我们应该分别填入缓冲区的大小和 0。 3. 生产者在处理流程中应当先 **等待空余的缓冲区资源**,而在填入内容后 **增加已填入的缓冲区资源的数量**,消费者和生产者刚好相反。 #### 信号量的实现 使用 **互斥锁** 和 **条件变量** 实现信号量的代码如下 :point_down:: ~~~c++ struct sem { int value; int wakeup; struct lock sem_lock; struct cond sem_cond; } void wait(struct sem *S) { lock(&S->sem_lock); S->value--; if (S->value < 0) { do( cond_wait(&S->sem_cond, &S->sem_lock); ) while (S->wakeup == 0); S->wakeup--; } unlock(&S->sem_lock); } void signal(struct sem *S) { lock(&S->sem_lock); S->value++; if (S->value <= 0) { S->wakeup++; cond_signal(&S->sem_cond); } unlock(&S->sem_lock); } ~~~ 1. 信号量结构体包含四个成员,除了 **条件变量** 和 **互斥锁** 之外,还包含 `value` 和 `wakeup`: 1. `value` 的值即可为正数,也可为负数: 1. 当 **没有线程等待** 时,`value` 为 **正数** 或 **零**,其表示 **剩余的资源数量**。 2. 当 **有线程等待** 时,`value` 为 **负数**,其 **绝对值** 代表 **正在等待获取资源的线程数量**。 2. `wakeup` 表示 **有线程等待** 时的 **可用资源数量**,其只能为 **正数** 或 **零**,该值同时也代表 **应当唤醒的线程数量**。 > 在该实现中,如果想判断 **当前可用的资源数量**: > > 1. 首先应当查看 `value`,若 `value` 为 **正数** 或 **零**,则取 `value` 为当前资源数量。 > 2. 否则,取 `wakeup` 为当前资源数量。 2. 对于信号量的 `wait` 操作: 1. 当线程在 **获取该信号量的互斥锁** 后,先 **将信号量的 `value` 减 1** : 1. 如果 `value` 的值 **大于等于 0**,则代表 **有空闲的资源**,`wait` **操作成功**。 2. 如果 `value` 的值 **小于 0**,那么他 **不再能表示当前可用的资源数量**,我们需要去检查 `wakeup` 来判断是否还有空闲的资源可以消耗: 1. 如果 `wakeup` **大于 0** 时,代表还 **有空闲的资源可以消耗**。 2. 如果 `wakeup` **等于 0**,此时需要用 `cond_wait` **将当前线程挂起等待**。 3. 对于信号量的 `signal` 操作: 1. 线程首先将 `value` 的值 **加 1**。 2. 然后判断 **是否还有线程等待在该信号量上**,如果 **有**,则 **增加 `wakeup`**,并使用 `cond_signal` **唤醒其中的一个线程来获取资源**。 > :thinking: 互斥锁与信号量的区别是什么? > > 1. 互斥锁与信号量有 **相似** 之处: > 1. 当信号量的初值设置为 1,且只允许其值在 0 和 1 之间变化时,`wait` 和 `signal` 操作分别与互斥锁的 `lock` 和 `unlock` 操作类似,这种信号量被称为 **二元信号量**。 > 2. 二元信号量与互斥锁的差别在于: > 1. 互斥锁有 **拥有者** 这一概念,往往由 **同一个线程** 加锁和解锁。 > 2. 二元信号量没有拥有者这一概念,允许 **不同线程** 执行 `wait` 与 `signal` 操作。 > 2. 互斥锁与 **计数信号量**(非二元信号量的其他信号量)**区别较大**: > 1. 互斥锁 **同一时刻** 只允许 **一个线程** 获取,而计数信号量允许 **多个线程** 通过,其 **数量等于剩余可用资源数量**。 > 2. 互斥锁用于保证 **多个线程对一个共享资源的互斥访问**,而信号量则用于协调 **多个线程对一系列共享资源的有序操作**: > 1. 互斥锁可以用于保护生产者消费者问题中的共享缓冲区。 > 2. 信号量可以用于协调生产者与消费者何时该等待,何时该放入或拿取缓冲区数据。 > :wink: 条件变量与信号量的区别? > > 本文中信号量是由条件变量、互斥锁以及计数器实现的,而这个计数器就是信号量的核心,用于表示当前可用资源的数量,因此可以理解为 **信号量利用条件变量实现了更高层次的抽象**。 ### :lock_with_ink_pen: 读写锁 #### 为什么需要读写锁 1. 在上面的 :point_up_2: 介绍中,所有对 **共享资源** 的操作均需要使用 **互斥锁** 进行保护,以保证在同一时刻,只能至多执行其中的 **一个** 操作。 2. 但是,考虑这样一种情况: 1. 有三个线程需要对共享数据进行操作,其中两个线程只需要 **读** :open_book: 共享的数据,而只有一个线程需要 **写** :writing_hand: 这个共享的数据。 2. 需要读共享数据的线程之间,相互不会产生干扰,我们需要保证的是 **写共享数据的线程与读共享数据的线程不能同时执行** 。 3. 在这种情况下,虽然直接使用互斥锁不会导致任何正确性问题,但是会 **消减读者之间的并行度**,造成一定的 **性能损失**。 3. **读写锁** 就是为了解决这个问题而提出的同步原语,下面我们将对读写锁的使用方法及其实现进行详细介绍 🫠。 #### 读写锁的使用 读写锁的示例代码如下所示 :point_down:: ~~~c++ struct rwlock lock; char data[SIZE]; void reader(void) { lock_reader(&lock); read_data(data); // 读临界区 unlock_reader(&lock); } void writer(void) { lock_writer(&lock); update_data(data); // 写临界区 unlock_writer(&lock); } ~~~ 1. 读写锁针对 **读者**(读共享数据的线程)与 **写者**(写共享数据的线程)分别提供了不同的 `lock` 与 `unlock` 操作: 1. 读者需要使用 `lock_reader` 与 `unlock_reader` 来保护 **读临界区**,读临界区 **只能保证读者与写者互斥**,**不能保证读者与读者互斥**: 1. 如果此时有写者在同一把读写锁保护的临界区内,读者就必须等待该写者退出临界区后才能进入。 2. 如果只有读者在临界区内,后续的读者就可能可以进入临界区。 2. 写者需要使用 `lock_writer` 与 `unlock_writer` 来保护写 **临界区**: 1. 写临界区既 **不允许其他读者** 进入读写锁保护的读临界区,也 **不允许其他写者** 进入读写锁保护的临界区。 #### 读写锁的实现 1. 读写锁允许 **多个读者** 同时进入读临界区,因此带来了 **倾向性** 问题: 1. 假设在某一时刻,已经有一些读者在临界区中,此时有一个写者与一个读者同时申请进入临界区,我们是否应该允许读者直接进入读临界区: 1. 如果允许 **读者** 直接进入,那么 **写者会一直被阻塞**,直到没有任何读者,这种被称为 **偏向读者的读写锁**。 2. 如果等之间的读者离开临界区后,先允许 **写者** 进入,再允许读者进入,这样虽能避免写者陷入无限等待,但同时也 **减少了读者的并行性**,这种称为 **偏向写者的读写锁**。 2. 读写锁的具体实现如下所示 :point_down:: 1. **偏向读者的读写锁**: 1. 读写锁的结构体包含一个用于表示 **当前读者数目** 的计数器 :timer_clock: `reader_cnt`,其在初始化时被设置为 0,除此之外,该结构体还包含一个 **控制读者对 `reader_cnt` 进行更新** 的互斥锁 :lock: `reader_lock`,与一个用于 **保证读者与写者**、**写者与写者之间互斥** 的互斥锁 :lock_with_ink_pen: `writer_lock`。 2. 读者在 **上锁**(`lock_reader`)时,需要使用 `reader_lock` 来保证对 `reader_cnt` 更新的原子性,如果当前的读者时 **第一个读者**,其还需要获取 `writer_lock`,从而 **保证读者与写者的互斥**,**阻塞后续写者**。 3. 读者在 **解锁**(`unlock_reader`)时,如果发现自己时 **最后一个读者**,其还需要释放 `writer_lock`,用于 **取消对写者的阻塞**,**后续的读者**在进入读临界区之前,必须 **再次获取** `writer_lock`。 4. 具体的实现如下面的代码所示 :point_down:: ~~~c++ struct rwlock { int reader_cnt; struct lock reader_lock; struct lock writer_lock; }; void lock_reader(struct rwlock *lock) { lock(&lock->reader_lock); lock-reader_cnt++; if (lock ->reader_cnt == 1) // 第一个读者 lock(&lock->writer_lock); unlock(&lock->reader_lock); } void unlock_reader(struct rwlock *lock) { lock(&lock->reader_lock); lock->reader_cnt--; if(lock->reader_cnt == 0) // 最后一个读者 unlock(&lock->writer_lock); unlock(&lock->reader_lock); } void lock_writer(struct rwlock *lock) { lock(&lock->writer_lock); } void unlock_writer(struct rwlock *lock) { unlock(&lock->writer_lock); } ~~~ > :thinking: 我们本想优化读者,可读者的加锁解锁流程相比于互斥锁怎么反而更加复杂了,读者还需要单独的一个读者锁,这是不是与『读者不互斥』的目标冲突了? > > 1. 读者加锁解锁流程在这个实现中确实较互斥锁更加复杂,但是读写锁的核心目标不是简化读者加锁解锁的流程 ❌ ,而是 **增加读者之间的并行度** :white_check_mark:。 > 2. 使用读写锁,不同的读者现在可以 **同时 :two_men_holding_hands: 处于读临界区**,而不是必须串行地、一个一个地 :bow: 进入读临界区,这将大大提高 **读操作多** :open_mouth: 的应用程序的性能。 > 3. 至于额外的读者锁,在 **临界区开始之前** 就已经执行了解锁操作,其保护的是对 **读写锁元数据** 的修改,而不是接下来的读临界区 🫠。 2. **偏向写者的读写锁**: 1. 由于偏向写者的读写锁需要在 **有新的写者到达时阻塞后续读者**,因此其除了用于记录 **当前读者数量** 的 `reader_cnt` 之外,还添加了一个布尔变量 `has_writer` 来表示 **当前是否有写者到达**: 1. 当有写者 **期望获取锁 🥹** 时,即使其还未真正进入临界区,也会将 `has_writer` 设置为 `TRUE`。 2. 读者将通过判断 `has_writer` 的值来决定是否需要等待前序的写者。 2. 由于写者也会操作读写锁中的 **元数据** :atom_symbol: (包括 `has_writer` 与 `reader_cnt`),因此偏向写者的读写锁提供了一把 **读者和写者共享** 的锁 `rwlock->lock`,要求读者与写者在 **操作元数据前** 获取该锁。 3. 读者在申请进入临界区(`lock_reader`)时,需要通过 `has_writer` 判断是否有其他写者在临界区内: 1. 如果观察到 `has_writer` 被置为 `TRUE`,则 **不能进入临界区**,需要 **使用条件变量挂起等待** :bow: 。 2. 当确保没有写者等待时,读者在 **增加读者计数器** :timer_clock: `reader_cnt` 并 **解锁** :key: 后便可进入读临界区。 4. 在写者需要进入临界区(`lock_writer`)时,也需要通过 `has_writer` 判断是否有其他写者在临界区内: 1. 如果存在其他写者,则需要 **使用条件变量挂起等待** :bow:。 2. 当没有其他写者时,当前写者就可以 **将 `has_writer` 设置为 `TRUE`**,并 **使用条件变量等待之前所有的读者离开读临界区**,最后,**释放保护元数据的锁即可进入写临界区**。 5. 读者在解锁时,除了需要 **减少读者的计数器 `reader_cnt`**,还需要判断自己 **是否为最后一个读者**,如果是最后一个读者,还需要使用 `cond_signal` 来 **唤醒 :open_mouth: 等待在 `reader_cnt` 上的写者**。 6. 写者在解锁时,除了**设置 `has_writer` 为 `FALSE`**,还应当使用 `cond_broadcast` **将等待在 `has_writer` 上的读者和写者都唤醒**。 > 这里使用 `cond_broadcast` 是因为读者与写者都有可能等待在 `has_writer` 上,因此两者都需要被唤醒,如果使用 `cond_signal`,有可能只唤醒了一个读者,那么等待在 `has_writer` 上的写者将永远不会被唤醒,从而陷入无限等待。 7. 具体的实现如下面的代码所示 :point_down:: ~~~c++ struct rwlock { volatile int reader_cnt; volatile bool has_writer; struct lock lock; struct cond reader_cond; struct cond writer_cond; } void lock_reader(struct rwlock *rwlock) { lock(&rwlock->lock); while(rwlock->has_writer == TRUE) cond_wait(&rwlock->writer_cond, &rwlock->lock) rwlock->reader_cnt++; unlock(&rwlock->lock); } void unlock_reader(struct rwlock *rwlock) { lock(&rwlock->lock); rwlock->reader_cnt--; if (rwlock->reader_cnt == 0) cond_signal(&rwlock->reader_cond); unlock(&rwlock->lock); } void lock_writer(struct rwlock *rwlock) { lock(&rwlock->lock); while(rwlock->has_writer == TRUE) cond_wait(&rwlock->writer_cond, &rwlock->lock); rwlock->has_writer = TRUE; while(rwlock->reader_cnt > 0) cond_wait(&rwlock->reader_cond, &rwlock->lock); unlock(&rwlock->lock); } void unlock_writer(struct rwlock *rwlock) { lock(&rwlock->lock); rwlock->has_writer = FALSE; cond_broadcast(&rwlock->writer_cond); unlock(&rwlock->lock); } ~~~ 3. 从上面介绍的两种读写锁实现中我们可以发现 :information_desk_person: : 1. **偏向读者的读写锁** 能大幅 **提高读者之间的并行度** :runner:,但是由于需要等到某一个时刻 **完全没有读者** 时才能 **更新数据**,因而可能面临很高的 **写延迟** :snail:。 2. **偏向写者的读写锁** 虽然避免了这个问题,让写者能够**在之前的读者离开读临界区后立刻进入临界区** :thumbsup: ,但 **读者之间的并行度会大幅下降** :disappointed:。 3. 因此开发者需要根据具体场景需求,选用合适的读写锁 :thinking:。 ### :package: RCU #### 为什么需要 RCU 1. 虽然读写锁允许多个读者同时进入临界区,但是**写者会阻塞读者**,且**读者**仍然需要**在关键路径上添加读者锁**,会造成一定的**性能开销**。 2. RCU(Read-Copy Update)是一种在操作系统内核中广泛使用的同步原语,其有效地**减少了读者在关键路径上的性能开销**,与读写锁一样,RCU 允许 **多个读者** 同时进入读临界区,但是希望 **写者不会阻塞读者** :smile: ,让读者在写者更新时 :writing_hand:,要么读到 **旧的数据** :older_man: ,要么读到 **新的值** 👦🏻,而 **不能观察到任何中间结果** 🧓,为了实现这一目标,RCU 中引入了一种 **订阅** :up: / **发布** :book: 机制,下面 :point_down: 我们会对这一机制的原理进行详细阐述。 #### 订阅 / 发布机制 1. 订阅 / 发布机制主要利用 **对 64 位指针**(或 32 为机器上的 32 位指针)**的读写的原子性**,**让写者能够原子地更新任意大小的数据**。 > 主流的现代 CPU 均提供对 **地址对齐** 的单一读写操作的原子性保证,其支持的位宽往往与 CPU 的位宽一致。 > > 例如,64 位的 CPU 往往能够保证对 **地址对齐的 64 位数据** 的读写操作的原子性,若该 CPU 中的一个核心需要对一个 64 位的指针变量进行更新,从 `0x0` 更新到 `0x1234567887654321`,这个更新要么**一次性被其他核心看见**(如读到该指针为 `0x1234567887654321`),要么**全部都不可见**(如读到该指针为 `0x0`),**不会出现其他核心读到任何中间数据**(如读到该指针为 `0x1234567800000000`)的情况。 2. 下面以向一个链表 :chains: 中插入新节点为例来介绍一下订阅 / 发布机制的原理: ![image-20220720152042029](https://notebook.ricear.com/media/202207/2022-07-20_152048_2699150.24798023072491393.png) 1. 插入新节点的操作分两步完成: 1. 第一步,先初始化新节点 $C$,即填入节点 $C$ 的数据和指向节点 $B$ 的指针。 2. 第二步,通过更新节点 $A$ 中的指针,使其指向节点 $C$。 > 尽管节点 $C$ 中的数据大小远远超过了 64 位,但其依赖于第二步的指针更新操作才能被新的读者看见,这正是因为对 64 位指针的更新操作是可以由硬件保证原子性的,因此,读者要么读到 **旧的指针** :older_man:,无法遍历到节点 $C$;要么读到 **新的指针** :boy:,可以遍历到节点 $C$。通过这个机制,可以让写者 **原子地更新任意大小的数据**。 2. 订阅与发布过程: 1. 当读者遍历到节点 $A$ 时,读取 `Next` 指针寻找下一个节点的操作就是 **订阅** 操作,读者必须使用 **订阅接口** 来**读取指针并解析其指向的内容**(如 `next = reu_defeference(&cur->Next)`)。 2. 上图中展示的第二步即是 **发布** 操作,写者需要利用 **发布接口** 完成 **指针的更新**(如 `rcu_assign_pointer(&NodeA->Next, NodeC)`)。 > :thinking: 订阅 / 发布操作本质上就是以此读操作 / 写操作,为何还需要将他们抽象为订阅和发布的接口呢? > > 1. 实际上,这涉及访存操作顺序的问题。在实际硬件中,为了更好的性能,有的 **CPU** :kiwi_fruit: 会允许 **访存操作乱序执行**。除此之外,**编译器** :tractor: 也会为了相同的目标,在编译成二进制代码时,尝试让 **没有依赖关系** 的 **访存操作乱序执行**。 > > > 有关指令重排序相关的内容可参考 [指令重排](https://notebook.ricear.com/project-34/doc-527)。 > 2. 如果只使用读 / 操作对 `Next` 指针进行更新,很可能在第一步 **填入节点 $C$ 的内容** 的访存操作与 **更新 `Next` 指针** 的操作之间产生了 **乱序**,使得读者在发现 `Next` 指针有更新并遍历到节点 $C$ 时,节点 $C$ 的数据还没有初始化完成,最终导致读者读到一个错误的数据。 > 3. 因此,我们需要保证第一步与第二步的访存顺序,这里提出的订阅 / 发布机制便是将 **保证顺序的操作打包成两个接口**,便于开发者使用。 > :information_desk_person: 上面 :point_up_2: 的 `rcu_assign_pointer(&NodeA->Next, NodeC)` 可以保证从 **编译器** 和 **CPU** 层面上 `&NodeA->Next` 被赋值前,`NodeC` 能够初始化并赋值完成,这个函数的具体实现(Linux kernel 4.11.4)如下所示 :point_down:: > > ~~~assembly > #define rcu_assign_pointer(p, v) > ({ > uintptr_t _r_a_p__v = (uintptr_t)(v); > > if (__builtin_constant_p(v) && (_r_a_p__v) == (uintptr_t)NULL) > WRITE_ONCE((p), (typeof(p))(_r_a_p__v)); > else > smp_store_release(&p, RCU_INITIALIZER((typeof(p))_r_a_p__v)); > _r_a_p__v; > }) > ~~~ > > 该段代码做了两件事: > > - 在必要时插入一个 [内存屏障](https://notebook.ricear.com/project-34/doc-527/?highlight=%E5%86%85%E5%AD%98%E5%B1%8F%E9%9A%9C#5-%E5%86%85%E5%AD%98%E5%B1%8F%E9%9A%9C)。 > - 关闭编译器 :tractor: 在 **赋值时的非顺序编译优化**,保证 **赋值时已经初始化** 了。 #### 宽限期 > 虽然订阅 / 发布机制可以保证读者要么读到旧的数据 :older_man:,要么读到新的数据 :boy:,但是旧的数据何时能够被安全地回收 :recycle:,在更新完指针之后,写者并不能确定是否还有读者正在读取旧的数据,此时还不能回收旧的数据占用的内存。 1. 为了确定何时能够回收资源,RCU 引入了一个叫作 **宽限期** :alarm_clock: 的概念,用于描述从写者 **更新指针** 到 **最后一个可能观察到旧数据的读者离开** 的这段时间。 2. 在宽限期中,由于可能有读者在使用旧的数据,因此旧数据不能被回收 :x: ,而当宽限期结束之时,没有读者会再观察到旧数据了,此时旧数据可以被回收 :white_check_mark:。 3. 为了实现这个目标,**写者** 必须区分出有可能 **观察到旧数据的读者**,而 **读者** 也必须标记 **自己的读临界区开始与结束的位置**,为此,RCU 为读者提供了两个接口(`rcu_read_lock` 与 `rcu_read_unlock`)分别用于标记 :pushpin: **读临界区的开始与结束**。 > :warning: 这里虽然有 `lock` / `unlock` 关键字,但并非意味着读者需要使用耗时的互斥锁来保护读临界区,这里的两个函数仅仅用于标识 **读临界区的开始与结束**,其实现可以是 **对某一时段的读者计数器 🧮 的增减操作**。 4. 下面以链表的删除 🗑 操作为例阐述一下宽限期的具体使用方法: ![image-20220720172804732](https://notebook.ricear.com/media/202207/2022-07-20_172810_7078380.668259276414421.png) ``` 1. 上图 :point_up_2: 中第一条竖线代表写者更新指针的时间点 :timer_clock:,在此时间点 **之前** :arrow_backward: 进入临界区的读者,都有可能观察到节点 $C$ 的数据(包括 `Reader-1/2/3`)。 2. 写者必须等待 **最后一个** :last_quarter_moon: 可能观察到节点 $C$ 的数据的读者离开临界区之后,才能回收 ♻️ 节点 $C$,上图中满足此条件的为 `Reader-2`,因此当 `Reader-2` 离开临界区时,写者就可以回收节点 $C$。 3. 而对于在更新 `Next` 指针之后才进入读临界区的 `Reader-4` 与 `Reader-5`,由于其不可能再观察到节点 $C$,因此无需等待这两个读者。 4. RCU 为写者提供了 `synchronize_rcu` 接口 👏,用于 **阻塞写者直到宽限期结束**,写者需要调用 `synchronize_rcu`,然后才能 **回收 :recycle: 需要删除的内容**。 ``` #### 总结 1. 通过使用订阅 / 发布机制来 **更新数据** 以及使用宽限期机制来 **回收失效数据**,RCU 允许读者在有写者并发写入共享数据时,也能无须任何耗时的同步操作、不被阻塞地随意读取共享数据,读者只需标明 :pushpin: **读临界区的开始与结束**,以及 **使用订阅接口 :gear: 读取指针** 即可,而写者则需要 **使用互斥锁 :lock: 来保证写者之间的互斥访问**。 2. 当然,RCU 并非完美 🫠: 1. 其最大限制为其 **仅适用于 🚫 能够使用订阅 / 发布机制来更新的对象**,对于双向链表这种更为复杂的数据结构,由于需要同时更新两个指针,RCU 就很难实现。 2. 此外,RCU 的写者需要一直等到最后一个可能观察到旧数据的读者离开临界区后才能回收就的数据,这个 **等待的耗时长短** :timer_clock: 也会充满不确定性 :man_shrugging:: 1. 因此,RCU 往往还提供 **异步的回收机制** :open_mouth: ,将等待与回收的开销从写者的关键路径上剔除。 2. 例如,在 Linux 内核中,提供了 `call_rcu` 函数,该函数要求写者传入一个 **回调函数**,其会在 **所有读者离开临界区** 时被调用,写者在调用 `call_rcu` **注册回调函数后会立即返回** :runner: ,无需等待所有读者离开。 > :thinking: 读写锁与 RCU 的区别是什么? > > 1. 这两个同步原语都可用于应对 **读多写少** 的场景,通过增加 **读者的并行度** 来提升应用程序的性能,不同的是: > 1. RCU 中的读者 **不会被写者阻塞** 且 **无须使用耗时的同步操作**,因此 RCU 中的读者的 **开销更小**。 > 2. 读写锁的接口较为 **方便**,可以轻松用于不同场景。 > 2. 下面 :point_down: 通过分析分别使用读写锁与 RCU 保护对链表 :chains: 进行修改的操作,进一步展示两者的区别,假设系统中存在多个读者 :family: 需要遍历该链表,同时存在一个写者 :person_with_pouting_face: 需要更新节点 $C$ 的内容: > 1. 在使用 **读写锁** :lock_with_ink_pen: 时: > 1. 若 **读者** :book: 先获得了读者锁,则写者必须等待 **所有读者读完旧的数据**(即 `Data-C`),才能进入临界区将数据更新为 `Data-C'`。 > 2. 若 **写者** :writing_hand: 先获得了写者锁,则所有读者都必须 **等待更新结束** 后才能开始遍历,这些读者将全部读到 `Data-C'`。 > 2. 在使用 **RCU** :package: 时: > 1. 读者和写者可以 **同时执行** 临界区,其中一部分读者在写者更新 `NodeA->Next` **之前** 便已到达节点 $C$,他们将读到旧的数据(即 `Data-C`),剩下的读者在写者更新完 `NodeA-Next` **之后** 到达节点 $C$,他们将读到新的数据(即 `Data-C'`)。 ### :hourglass: 管程 #### 为什么需要管程 > **线程安全** :construction_worker: 是指某个函数、函数库在 **多线程** 环境中被调用时,能够正确地使用同步原语保护多个线程对 **共享变量** 的访问与修改。 前面介绍的 **互斥锁**、**条件变量**、**信号量** 和 **读写锁** 这些同步原语可以帮助开发者实现线程安全的应用程序 :thumbsup: ,然而,这些工具拥有各不相同的语义和适用环境 :disappointed_relieved:,开发者在实际应用程序中稍有不慎就会出现 **死锁** :recycle: 等错误,因此,为了减轻开发者的负担,人们提出了 **管程** 这一抽象概念。 #### 管程的含义 1. 管程包含两部分内容,一部分是 **共享的数据** :shamrock:,另一部分是 **操作共享数据的函数** :hammer_and_wrench:。 2. 管程保证在同一时刻最多只有 **一个** ☝🏻 操作者能够进入管程的保护区域 **访问共享数据**。 3. 由于管程自身已经保证了访问数据的 **互斥性**,开发者只需要直接调用管程提供的函数即可 :relaxed:,无须使用额外的同步原语,从而减轻了开发者的负担。 > :thinking: 管程和其他同步原语的区别是什么? > > 管程是一种高层级 :hushed: 的抽象,其利用其他同步原语实现了一套干净的接口,开发者无须考虑底层同步的实现,就能保证应用程序的正确性 :thumbsup:。 #### 管程的实现 管程一般被用于 **面向对象** 的程序中,如高级语言 Java 就对管程提供了支持,其可以使用 `synchronized` 关键字及其对应的管程来实现不同线程间的同步,详细内容可参考 [Synchronized 实现原理](https://notebook.ricear.com/project-34/doc-815)。 ## 同步带来的问题 同步原语给开发者带来便利的同时,也带来了一系列问题,主要包括 **死锁** :lock:、**活锁** :lock_with_ink_pen: 和 **优先级反转** :arrow_backward:。 ### :lock: 死锁 死锁的相关内容可参考 [死锁](https://notebook.ricear.com/project-26/doc-337)。 ### :closed_lock_with_key: 活锁 1. 除了死锁,在同步中还有 **活锁**,同死锁类似,出现活锁时,锁的竞争者很长一段时间都无法获取锁进入临界区,下面以之前在死锁预防中介绍的 **破坏请求与保持条件** 的方法为例,详细介绍活锁。 2. 破坏请求与保持条件策略要求一次性申请所有的资源,一旦 **任意** 需要获取的资源不可用,则需要放弃 **所有** 已经占有的资源并尝试重新获取,具体实例如下 :point_down: : ~~~c++ while (TRUE) { if(trylock(A) == SUCC) { if(trylock(B) == SUCC) { // 临界区 unlock(B); unlock(A); break; } else { unlock(A); } } } ~~~ 1. 在这个场景中,线程需要获取锁 $A$ 与锁 $B$ 才能进入临界区。 2. 为了避免 **请求与保持** 条件,在成功获取了锁 $A$ 之后,一旦获取锁 $B$ 失败,该线程将立刻放弃锁 $A$,并尝试重新获取锁 $A$ 与锁 $B$。 3. 虽然这样能够避免死锁,但是会导致在特殊情况下 **没有线程** 能够同时获取锁 $A$ 与锁 $B$: 1. 假设存在另外一个线程,其运行代码与上面的代码类似,不同的是,其先获取锁 $B$ 再尝试获取锁 $A$。 2. 当两个线程同时执行时,便有可能出现: 1. 一个线程获取了锁 $A$ 并尝试获取锁 $B$ 时,另一个线程获取了锁 $B$ 并尝试获取锁 $A$。 2. 此时两个线程都无法成功获取锁 $A$ 与锁 $B$,因此只能释放自己已经获取的锁并重试。 3. 若两个线程执行速度相似,则很有可能下一次又出现了同样的情况,再次占有对方尝试获取的锁,但又无法获取全部的资源,最终只能再次释放。 4. 如此往复,这两个线程**都无法成功地同时获取两把锁并进入临界区**,这种现象被称为 **活锁** :open_mouth:。 3. 活锁并 **没有发生阻塞** ❌ ,而是线程不断重复着『尝试 - 失败 - 尝试 - 失败』的过程 :white_check_mark: ,导致在一段时间内没有线程能够成功达到条件并运行下去,:exclamation: 由于活锁实际上没有发生阻塞,因此不同于活锁,活锁可能会 **自行解开** :open_hands:。 > :information_desk_person: 在刚才的例子中,如果两个线程的其中一个没有被操作系统调度执行,另一个线程就有机会一鼓作气拿到两个锁进入临界区。 4. 活锁由于产生的条件比较特殊,并 **没有一种统一的方法类避免** :man_shrugging:,需要结合具体问题来分析和提出避免方案。 > :information_desk_person: 在刚才的例子中,我们可以采取『让线程在获取失败后 **等待随机的时间** 再开始下次尝试』的策略来减少出现活锁的概率,或者『要求所有的线程都按照一个 **统一的顺序** 来获取锁』,从而避免出现活锁。 ### :aerial_tramway: 优先级反转 优先级反转的相关内容可参考 [优先级反转](https://notebook.ricear.com/project-26/doc-328/#3-%E4%BC%98%E5%85%88%E7%BA%A7%E5%8F%8D%E8%BD%AC)。 ## 参考文献 1. 《现代操作系统:原理与实现》 2. [LINUX RCU 机制](http://kerneltravel.net/blog/2021/rcu_szp)。
ricear
July 24, 2022, 5:21 p.m.
©
BY-NC-ND(4.0)
转发文档
Collection documents
Last
Next
手机扫码
Copy link
手机扫一扫转发分享
Copy link
Markdown文件
share
link
type
password
Update password