|
到目前为止的并发,仅仅是进程间的并发,
|
对于一个进程内部还没有并发性的体现。
|
而这就是线程(Thread)出现的起因:
|
|
对于很多应用(以单一进程的形式运行)而言,
|
逻辑上存在多个可并行执行的任务,
|
如果其中一个任务被阻塞,
|
将会引起不依赖该任务的其他任务也被阻塞。
|
|
具体的例子,我们平常用编辑器来编辑文本内容的时候,都会有一个定时自动保存的功能,
|
这个功能的作用是在系统或应用本身出现故障的情况前,已有的文档内容会被提前保存。
|
假设编辑器自动保存时由于磁盘性能导致写入较慢,导致整个进程被操作系统挂起,这就会影响到用户编辑文档的人机交互体验:
即软件的及时响应能力不足,
|
用户只有等到磁盘写入完成后,
|
操作系统重新调度该进程运行后,用户才可编辑。
|
|
|
如果我们把一个进程内的多个可并行执行任务通过一种更细粒度的方式让操作系统进行调度,
|
那么就可以通过处理器时间片切换实现这种细粒度的并发执行。
|
这种细粒度的调度对象就是线程。
|
|
线程定义
线程是进程的组成部分
|
进程可包含1 – n个线程
|
属于同一个进程的线程共享进程的资源
|
基本的线程由线程ID、执行状态、当前指令指针 (PC)、寄存器集合和栈组成。
|
线程是可以被操作系统或用户态调度器独立调度(Scheduling)和分派(Dispatch)的基本单位
|
|
本章之前,进程是程序的基本执行实体
|
是程序关于某数据集合上的一次运行活动
|
是系统进行资源(处理器、 地址空间和文件等)分配和调度的基本单位
|
|
有了线程后,对进程的定义也要调整了,
|
进程是线程的资源容器,
|
线程成为了程序的基本执行实体。
|
|
|
同步互斥
当多个线程共享同一进程的地址空间时
|
每个线程都可以访问属于这个进程的数据(全局变量)。
|
如果每个线程使用到的变量都是其他线程不会读取或者修改的话, 那么就不存在一致性问题。
|
如果变量是只读的,多个线程读取该变量也不会有一致性问题。
|
但是,当一个线程修改变量时, 其他线程在读取这个变量时,可能会看到一个不一致的值,
|
这就是数据不一致性的问题。
|
|
并发术语
共享资源(shared resource):
|
不同的线程/进程都能访问的变量或数据结构。
|
临界区(critical section):
|
访问共享资源的一段代码。
|
竞态条件(race condition):
|
多个线程/进程都进入临界区时,都试图更新共享的数据结构,导致产生了不期望的结果。
|
不确定性(indeterminate):
|
多个线程/进程在执行过程中出现了竞态条件,导致执行结果取决于哪些线程在何时运行, 即执行结果不确定,而开发者期望得到的是确定的结果。
|
互斥(mutual exclusion):
|
一种操作原语,能保证只有一个线程进入临界区,从而避免出现竞态,并产生确定的执行结果。
|
原子性(atomic):
|
一系列操作要么全部完成,要么一个都没执行,不会看到中间状态。在数据库领域, 具有原子性的一系列操作称为事务(transaction)。
|
同步(synchronization):
|
多个并发执行的进程/线程在一些关键点上需要互相等待,这种相互制约的等待称为进程/线程同步。
|
死锁(dead lock):
|
一个线程/进程集合里面的每个线程/进程都在等待只能由这个集合中的其他一个线程/进程 (包括他自身)才能引发的事件,这种情况就是死锁。
|
饥饿(hungry):
|
指一个可运行的线程/进程尽管能继续执行,但由于操作系统的调度而被无限期地忽视,导致不能执行的情况。
|
|
哲学家就餐
|
ch8b_phil_din_mutex
|
计算机科学家 Dijkstra 提出并解决的哲学家就餐问题是经典的进程同步互斥问题。哲学家就餐问题描述如下:
|
有5个哲学家共用一张圆桌,分别坐在周围的5张椅子上,在圆桌上有5个碗和5只筷子,他们的生活方式是交替地进行思考和进餐。 平时,每个哲学家进行思考,饥饿时便试图拿起其左右最靠近他的筷子,只有在他拿到两只筷子时才能进餐。进餐完毕,放下筷子继续思考。
|
|
|
线程概念
结合与进程的比较来说明线程的概念
|
操作系统让进程拥有相互隔离的虚拟的地址空间address space
|
进程感到在独占一个虚拟的处理器
|
只是操作系统通过时分复用和空分复用技术来让每个进程复用有限的物理内存和物理CPU。
|
|
线程是在进程内中的一个新的抽象
|
在没有线程之前,一个进程在一个时刻只有一个执行点(即程序计数器 (PC) 寄存器保存的要执行指令的指针)。
|
线程的引入把进程内的这个单一执行点给扩展为多个执行点
|
即在进程中存在多个线程, 每个线程都有一个执行点。
|
而且这些线程共享进程procee的地址空间address space
|
可以不必采用相对比较复杂的 IPC 机制(一般需要内核的介入)
|
可以很方便地直接访问进程内的数据
|
|
线程的具体运行过程中,需要有程序计数器寄存器来记录当前的执行位置
|
需要有一组通用寄存器记录当前的指令的操作数据
|
需要有一个栈来保存线程执行过程的函数调用栈和局部变量等,
|
形成了线程上下文的主体部分。
|
|
这样如果两个线程运行在一个处理器上,就需要采用类似两个进程运行在一个处理器上的调度/切换管理机制,
|
即需要在一定时刻进行线程切换
|
并进行线程上下文的保存与恢复
|
这样在一个进程中的多线程可以独立运行,
|
取代了进程,
|
成为操作系统调度的基本单位
|
|
由于把进程的结构进行了细化,
|
通过线程来表示对处理器的虚拟化,
|
使得进程成为了管理线程的容器。
|
在进程中的线程没有父子关系,大家都是兄弟,
|
但还是有个老大。
|
|
这个代表老大的线程其实就是创建进程(比如通过 fork 系统调用创建进程)时,建立的第一个线程,它的线程标识符(TID)为 0 。
|
|
|
线程模型与重要系统调用
一种非常简单的线程模型
|
三个运行状态:
|
共享所属进程process的地址空间address space和其他共享资源(如文件等)
|
可被操作系统调度来分时占用CPU执行;
|
可以动态创建和退出;
|
可通过系统调用获得操作系统的服务。
|
|
我们实现的线程模型建立在进程的地址空间抽象之上:
每个线程都共享进程的代码段和和可共享的地址空间(如全局数据段、堆等),
|
但有自己的独占的栈
|
|
线程模型需要操作系统支持一些重要的系统调用:
|
线程创建系统调用
一个进程的运行过程中,进程可以创建多个属于这个进程的线程,
|
每个线程有自己的线程标识符(TID,Thread Identifier)
|
|
pub fn sys_thread_create(entry: usize, arg: usize) -> isize
功能:当前进程创建一个新的线程
|
参数:entry 表示线程的入口函数地址
|
参数:arg:表示线程的一个参数
|
|
当进程调用 thread_create 系统调用后,内核会在这个进程内部创建一个新的线程,
|
这个线程能够访问到进程所拥有的代码段, 堆和其他数据段。
|
但内核会给这个新线程分配一个它专有的用户态栈,
|
这样每个线程才能相对独立地被调度和执行。
|
|
另外,由于用户态进程与内核之间有各自独立的页表,
|
所以二者需要有一个跳板页 TRAMPOLINE 来处理用户态切换到内核态的地址空间平滑转换的事务。
|
所以当出现线程后,在进程中的每个线程也需要有一个独立的跳板页 TRAMPOLINE 来完成同样的事务。
|
|
相比于创建进程的 fork 系统调用,
|
创建线程不需要要建立新的地址空间,
|
另外属于同一进程中的线程之间没有父子关系
|
|
|
|
等待子线程系统调用
pub fn sys_waittid(tid: usize) -> i32
/// 参数:tid表示线程id
|
/// 返回值:如果线程不存在,返回-1;如果线程还没退出,返回-2;其他情况下,返回结束线程的退出码
|
|
当一个线程执行完代表它的功能后,会通过 exit 系统调用退出。
|
内核在收到线程发出的 exit 系统调用后, 会回收线程占用的部分资源,即用户态用到的资源,
比如用户态的栈,
|
用于系统调用和异常处理的跳板页等。
|
|
而该线程的内核态用到的资源,比如内核栈等,需要通过进程/主线程调用 waittid 来回收了,
|
这样整个线程才能被彻底销毁。
|
|
一般情况下进程/主线程要负责通过 waittid 来等待它创建出来的线程(不是主线程)
|
结束并回收它们在内核中的资源 (如线程的内核栈、线程控制块等)。
|
|
如果进程/主线程先调用了 exit 系统调用来退出,
|
那么整个进程 (包括所属的所有线程)都会退出,
|
而对应父进程会通过 waitpid 回收子进程剩余还没被回收的资源。
|
|
|
|
进程相关的系统调用
引入了线程机制后,进程相关的重要系统调用:
fork 、 exec 、 waitpid 虽然在接口上没有变化,
|
但在它要完成的功能上需要有一定的扩展。
|
|
把以前进程中与处理器执行相关的部分拆分到线程中。
|
通过 fork 创建进程其实也意味着要单独建立一个主线程来使用处理器,
|
并为以后创建新的线程建立相应的线程控制块向量。
|
|
相对而言, exec 和 waitpid 这两个系统调用要做的改动比较小,
|
还是按照与之前进程的处理方式来进行。
|
|
总体上看, 进程相关的这三个系统调用还是保持了已有的进程操作的语义,并没有由于引入了线程,而带来大的变化
|
|
|
|
应用程序示例
一个多线程应用程序 threads 的开发过程来展示这些系统调用的使用方法。
|
系统调用封装
在 user/src/syscall.rs 中看到以 sys_* 开头的系统调用的函数原型
|
后续还会在 user/src/lib.rs 中被封装成方便应用程序使用的形式
|
如 sys_thread_create 被封装成 thread_create
|
sys_waittid 被封装成 waittid
waittid 等待一个线程标识符的值为tid 的线程结束
|
当 sys_waittid 返回值为 -2 ,即要等待的线程存在但它却尚未退出的时候
|
主线程调用 yield_ 主动交出 CPU 使用权
|
待下次 CPU 使用权被内核交还给它的时候再次调用 sys_waittid 查看要等待的线程是否退出
|
为了减小 CPU 资源的浪费。
|
这种方法是为了尽可能简化内核的实现
|
|
|
|
多线程应用程序 – threads
多线程应用程序 – threads 开始执行后,先调用 thread_create 创建了三个线程
|
加上进程自带的主线程,其实一共有四个线程。
|
每个线程在打印了1000个字符后,会执行 exit 退出。
|
进程通过 waittid 等待这三个线程结束后,最终结束进程的执行。
|
//usr/src/bin/ch8b_threads.rs
|
|
|
|
线程管理的核心数据结构
还是TCB任务控制块 TaskControlBlock
|
表示线程的核心数据结构。
|
任务管理器 TaskManager
|
管理线程集合的核心数据结构。
|
处理器管理结构 Processor
|
用于线程调度,维护线程的处理器状态。
|
|
线程控制块TCB
内核对线程进行管理的核心数据结构
|
在内核看来,它就等价于一个线程
|
|
|
主要包括在线程初始化之后就不再变化的元数据
|
以及在运行过程中可能发生变化的元数据
|
|
1pub struct TaskControlBlock {
|
|
2 // immutable
|
|
3 pub process: Weak,
|
|
4 pub kernel_stack: KernelStack,
|
|
5 // mutable
|
|
6 inner: UPSafeCell,
|
|
7}
|
|
8
|
|
9pub struct TaskControlBlockInner {
|
|
10 pub trap_cx_ppn: PhysPageNum,
|
|
11 pub task_cx: TaskContext,
|
|
12 pub task_status: TaskStatus,
|
|
13 pub exit_code: Option,
|
|
14 pub res: Option,
1pub struct TaskUserRes {
|
2 pub tid: usize,
|
tid:线程标识符
|
|
3 pub ustack_base: usize,
|
ustack_base:线程的栈顶地址
|
|
4 pub process: Weak,
|
process:线程所属的进程
|
|
5}
|
|
用户态的线程代码执行需要的信息,这些在线程初始化之后就不再变化:
|
|
15}
|
|
|
|
包含线程的进程控制块PCB
程把与处理器执行相关的部分都移到了 TaskControlBlock 中
|
组织为一个线程控制块向量中
|
自然对应到多个线程的管理上了
|
|
而 RecycleAllocator 是对之前的 PidAllocator 的一个升级版
|
一个相对通用的资源分配器,可用于分配进程标识符(PID)和线程的内核栈(KernelStack)。
|
|
|
1pub struct ProcessControlBlock {
2 // immutable
|
3 pub pid: PidHandle,
|
4 // mutable
|
5 inner: UPSafeCell,
|
6}
|
7
|
8pub struct ProcessControlBlockInner {
|
9 ...
|
|
10 pub tasks: Vec |
11 pub task_res_allocator: RecycleAllocator,
|
12}
|
|
|
线程与处理器管理结构taskmanager
负责管理所有线程
|
处理器管理结构 Processor 负责维护 CPU 状态、调度和特权级切换等事务。
|
|
其数据结构与之前章节中进程的处理器管理结构完全一样。
|
|
但在相关方法上面,由于多个线程有各自的用户栈和跳板页,所以有些不同
|
|
|
|
线程管理机制的设计与实现
线程创建、线程退出与等待线程结束
线程创建
|
一个进程process执行中发出了创建线程的系统调用 sys_thread_create 后
|
操作系统就需要在当前进程的基础上创建一个线程了
|
重点是需要了解创建线程控制块
|
在线程控制块中初始化各个成员变量,
|
建立好进程和线程的关系等。
|
正确运行所需的重要的执行环境要素
线程的用户态栈
|
确保在用户态的线程能正常执行函数调用;
|
线程的内核态栈
|
确保线程陷入内核后能正常执行函数调用;
|
线程的跳板页
|
确保线程能正确的进行用户态<–>内核态切换;
|
线程上下文
|
即线程用到的寄存器信息,用于线程切换。
|
|
pub fn sys_thread_create(entry: usize, arg: usize) -> isize {
找到当前正在执行的线程 task 和此线程所属的进程 process 。
|
调用 TaskControlBlock::new 方法,创建一个新的线程 new_task ,
|
在创建过程中,建立与进程 process 的所属关系,分配了线程用户态栈、内核态栈、用于异常/中断的跳板页。
|
|
把线程挂到调度队列中。
|
把线程接入到所需进程的线程列表 tasks 中。
|
初始化位于该线程在用户态地址空间中的 Trap 上下文:
设置线程的函数入口点和用户栈,
|
使得第一次进入用户态时能从线程起始位置开始正确执行;
|
设置好内核栈和陷入函数指针trap_handler ,
|
保证在 Trap 的时候用户态的线程能正确进入内核态。
|
|
|
|
线程退出
|
当一个非主线程的其他线程发出 sys_exit 系统调用时
|
内核会调用 exit_current_and_run_next 函数退出当前线程并切换到下一个线程,
|
但不会导致其所属进程的退出。
|
|
当 主线程 即进程发出这个系统调用,
|
内核会回收整个进程(这包括了其管理的所有线程)资源,并退出
|
|
pub fn sys_exit(exit_code: i32) -> ! {
|
pub fn exit_current_and_run_next(exit_code: i32) {
回收线程的各种资源。
|
如果是主线程发出的退去请求,则回收整个进程的部分资源,并退出进程。
|
所做的事情是将当前进程的所有子进程挂在初始进程 INITPROC 下面,
做法是遍历每个子进程, 修改其父进程为初始进程,
|
并加入初始进程的孩子向量中。
|
|
将当前进程的孩子向量清空。
|
进行线程调度切换。
|
很大一部分与第五章讲解的 进程的退出 的功能实现大致相同
|
|
|
|
等待线程结束
|
主线程通过系统调用 sys_waittid 来等待其他线程的结束
|
pub fn sys_waittid(tid: usize) -> i32 {
如果是线程等自己,返回错误.
|
如果找到 tid 对应的退出线程,则收集该退出线程的退出码 exit_tid ,
|
否则返回错误(退出线程不存在)。
|
|
如果退出码存在,则清空进程中对应此退出线程的线程控制块(至此,线程所占资源算是全部清空了),
|
否则返回错误(线程还没退出)。
|
|
|
|
|
线程执行中的特权级切换和调度切换
线程执行中的特权级切换与第三章中 任务切换的设计与实现 小节中讲解的过程是一致的
|
线程执行中的调度切换过程与第五章的 进程调度机制 小节中讲解的过程是一致的
|
|
|
锁机制
如果多个线程都想读和更新全局数据,
|
那么谁先更新取决于操作系统内核的抢占式调度和分派策略。
|
在一般情况下,每个线程都有可能先执行, 且可能由于中断等因素,随时被操作系统打断其执行,而切换到另外一个线程运行, 形成在一段时间内,多个线程交替执行的现象。
|
如果没有一些保障机制(比如互斥、同步等),
|
那么这些对共享数据进行读写的交替执行的线程,
|
其期望的共享数据的正确结果可能无法达到。
|
|
所以,我们需要研究一种保障机制 — 锁 ,
|
确保无论操作系统如何抢占线程,调度和切换线程的执行,
|
都可以保证对拥有锁的线程,
|
可以独占地对共享数据进行读写,
|
从而能够得到正确的共享数据结果。
|
|
这种机制的能力来自于处理器的指令、
|
操作系统系统调用的基本支持,
|
从而能够保证线程间互斥地读写共享数据。
|
|
|
为什么需要锁
简单例子
|
第 4 行代码,一般人理解处理器会一次就执行完这条简单的语句
|
实际情况并不是这样。
|
|
1// 线程的入口函数
|
2int a=0;
|
3void f() {
|
4 a = a + 1;
|
5}
|
|
GCC 编译出上述函数的汇编码
|
riscv64-unknown-elf-gcc -o f.s -S f.c
|
|
看到生成的汇编代码如下:
|
对于高级语言的一条简单语句(C 代码的第 4 行,对全局变量进行读写)
|
很可能是由多条汇编代码 (汇编代码的第 18~23 行)组成
|
如果这个函数是多个线程要执行的函数,那么在上述汇编代码第 18 行到第 23 行中的各行之间,可能会发生中断,
|
从而导致操作系统执行抢占式的线程调度和切换, 就会得到不一样的结果。
|
|
由于执行这段汇编代码(第 18~23 行))的多个线程在访问全局变量过程中可能导致竞争状态,
|
|
因此我们将此段代码称为临界区(critical section)
|
临界区(critical section)
|
临界区是访问共享变量(或共享资源)的代码片段, 不能由多个线程同时执行,即需要保证互斥。
|
|
|
|
...
|
18 lui a5,%hi(a)
|
19 lw a5,%lo(a)(a5)
|
20 addiw a5,a5,1
|
21 sext.w a4,a5
|
22 lui a5,%hi(a)
|
23 sw a4,%lo(a)(a5)
|
...
|
|
下面是有两个线程T0、T1在一个时间段内的一种可能的执行情况:
|
一般情况下,线程 T0 执行完毕后,再执行线程 T1,那么共享全局变量 a 的值为 2 。
|
但在上面的执行过程中, 可以看到在线程执行指令的过程中会发生线程切换,这样在时刻 10 的时候,共享全局变量 a 的值为 1 , 这不是我们预期的结果。
|
出现这种情况的原因是两个线程在操作系统的调度下(在哪个时刻调度具有不确定性), 交错执行 a = a + 1 的不同汇编指令序列,导致虽然增加全局变量 a 的代码被执行了两次, 但结果还是只增加了 1 。
|
这种多线程的最终执行结果不确定(indeterminate),
|
|
取决于由于调度导致的、 不确定指令执行序列的情况
|
|
就是竞态条件(race condition)。
|
就是竞态条件(race condition)。
|
|
如果每个线程在执行 a = a + 1 这个 C 语句所对应多条汇编语句过程中,不会被操作系统切换,
|
那么就不会出现多个线程交叉读写全局变量的情况,也就不会出现结果不确定的问题了。
|
|
访问(特指写操作)共享变量代码片段,不能由多个线程同时执行(即并行)或者在一个时间段内都去执行 (即并发)。
|
|
要做到这一点,需要互斥机制的保障。
|
|
从某种角度上看,这种互斥性也是一种原子性, 即线程在临界区的执行过程中,不会出现只执行了一部分,就被打断并切换到其他线程执行的情况。
|
原子性
|
即, 要么线程执行的这一系列操作/指令都完成,要么这一系列操作/指令都不做,不会出现指令序列执行中被打断的情况。
|
|
|
|
|
|
锁的基本思路
保证多线程并发执行中的临界区的代码具有互斥性或原子性
|
建立一种锁
|
只有拿到锁的线程才能在临界区中执行
|
|
|
1lock(mutex); // 尝试取锁
|
2a = a + 1; // 临界区,访问临界资源 a
|
3unlock(mutex); // 是否锁
|
4... // 剩余区
|
|
|
对于一个应用程序而言,它的执行是受到其执行环境的管理和限制的
|
执行环境的主要组成就是用户态的系统库、 操作系统和更底层的处理器
|
说明我们需要有硬件和操作系统来对互斥进行支持。
|
|
一个自然的想法是,这个 lock/unlock 互斥操作就是CPU提供的机器指令
|
但需要注意,这里互斥的对象是线程的临界区代码,
|
而临界区代码可以访问各种共享变量(简称临界资源)。
|
只靠两条机器指令,难以识别各种共享变量,
|
不太可能约束可能在临界区的各种指令执行共享变量操作的互斥性。
|
|
所以,我们还是需要有一些相对更灵活和复杂一点的方法,能够设置一种所有线程能看到的标记,
|
在一个能进入临界区的线程设置好这个标记后,其他线程都不能再进入临界区了。
|
总体上看, 对临界区的访问过程分为四个部分:
|
尝试取锁:
查看锁是否可用,即临界区是否可访问(看占用临界区标志是否被设置),
|
如果可以访问, 则设置占用临界区标志(锁不可用)并转到步骤 2 ,
|
否则线程忙等或被阻塞;
|
|
临界区:
|
释放锁:
清除占用临界区标志(锁可用),
|
如果有线程被阻塞,会唤醒阻塞线程;
|
|
剩余区:
|
|
|
可以看到锁机制有两种:
让线程忙等的忙等锁(spin lock),
|
以及让线程阻塞的睡眠锁 (sleep lock)。
|
|
锁的实现大体上基于三类机制:
|
我们还需要知道如何评价各种锁实现的效果。
|
一般我们需要关注锁的三种属性:
|
互斥性(mutual exclusion),
即锁是否能够有效阻止多个线程进入临界区,这是最基本的属性。
|
|
公平性(fairness)
|
性能(performance),
|
|
|
|
|
内核态操作系统级方法实现锁 — mutex 系统调用
轻量的可睡眠锁
|
让等待锁的线程睡眠,让释放锁的线程显式地唤醒等待锁的线程
|
如果有多个等待锁的线程,可以全部释放,让大家再次竞争锁
|
也可以只释放最早等待的那个线程。
|
这就需要更多的操作系统支持,特别是需要一个等待队列来保存等待锁的线程。
|
|
多线程应用程序如何使用mutex系统调用的
// user/src/bin/race_adder_mutex_blocking.rs
创建了一个ID为 0 的互斥锁,对应的是第32行 SYSCALL_MUTEX_CREATE 系统调用;
|
尝试获取锁(对应的是第35行 SYSCALL_MUTEX_LOCK 系统调用),
|
如果取得锁, 将继续向下执行临界区代码;如果没有取得锁,将阻塞;
|
|
释放锁(对应的是第38行 SYSCALL_MUTEX_UNLOCK 系统调用),
|
如果有等待在该锁上的线程, 则唤醒这些等待线程。
|
|
|
|
实现
数据结构
|
1pub struct ProcessControlBlock {
|
|
2 // immutable
|
|
3 pub pid: PidHandle,
|
|
4 // mutable
|
|
5 inner: UPSafeCell,
|
|
6}
|
|
7pub struct ProcessControlBlockInner {
|
|
8 ...
|
|
|
|
|
|
|
|
9 pub mutex_list: Vec |
在线程的眼里,互斥是一种每个线程能看到的资源,
|
且在一个进程中,可以存在多个不同互斥资源,
|
所以我们可以把所有的互斥资源放在一起让进程来管理,
|
实现了 Mutex trait 的一个“互斥资源”的向量
|
|
10}
|
|
11pub trait Mutex: Sync + Send {
|
|
12 fn lock(&self);
|
|
13 fn unlock(&self);
|
|
14}
|
|
15pub struct MutexBlocking {
|
会实现 Mutex trait 的内核数据结构
|
就是我们提到的 互斥资源 即 互斥锁
|
|
16 inner: UPSafeCell,
|
|
17}
|
|
18pub struct MutexBlockingInner {
|
|
19 locked: bool,
|
|
20 wait_queue: VecDeque>,
|
操作系统需要显式地施加某种控制,来确定当一个线程释放锁时,等待的线程谁将能抢到锁。
|
为了做到这一点,操作系统需要有一个等待队列来保存等待锁的线程
|
|
21}
|
|
|
|
操作系统中,需要设计实现三个核心成员变量
互斥锁的成员变量有两个:
|
表示是否锁上的 locked 和管理等待线程的等待队列 wait_queue;
|
进程的成员变量:
|
锁向量 mutex_list 。
|
|
创建一个互斥锁,
|
pub fn sys_mutex_create(blocking: bool) -> isize {
Some(Arc::new(MutexBlocking::new()))
|
如果向量中有空的元素,就在这个空元素的位置创建一个可睡眠的互斥锁;
|
process_inner.mutex_list.push(Some(Arc::new(MutexSpin::new())));
|
如果向量满了,就在向量中添加新的可睡眠的互斥锁;
|
|
上锁
在锁已被其他线程获取的情况下,把当前线程放到等待队列中, 并调度一个新线程执行。
|
|
pub fn sys_mutex_lock(mutex_id: usize) -> isize {
mutex.lock();
|
调用 ID 为 mutex_id 的互斥锁 mutex 的 lock 方法,具体工作由该方法来完成。
|
|
impl Mutex for MutexBlocking {
fn lock(&self) {
16 if mutex_inner.locked {
|
17 mutex_inner.wait_queue.push_back(current_task().unwrap());
|
如果互斥锁 mutex 已经被其他线程获取了,那么在第 17 行,将把当前线程放入等待队列中;
|
19 block_current_and_run_next();
|
在第 19 行,让当前线程处于等待状态,并调度其他线程执行。
|
21 mutex_inner.locked = true;
|
如果互斥锁 mutex 还没被获取,那么当前线程会获取给互斥锁,并返回系统调用。
|
|
|
释放锁
如果有等待在这个互斥锁上的线程,需要唤醒最早等待的线程
|
|
pub fn sys_mutex_unlock(mutex_id: usize) -> isize {
8 mutex.unlock();
|
调用 ID 为 mutex_id 的互斥锁 mutex 的 unlock 方法,具体工作由该方法来完成的。
|
|
impl Mutex for MutexBlocking {
fn unlock(&self) {
17 if let Some(waking_task) = mutex_inner.wait_queue.pop_front() {
|
18 add_task(waking_task);
|
如果有等待的线程,唤醒等待最久的那个线程,相当于将锁的所有权移交给该线程。
|
20 mutex_inner.locked = false;
|
若没有线程等待,则释放锁。
|
|
|
|
|
|
|
信号量机制
当我们需要更灵活的互斥访问或同步操作方式,如提供了最多只允许 N 个线程访问临界资源的情况
|
让某个线程等待另外一个线程执行完毕后再继续执行的同步过程等, 互斥锁这种方式就有点力不从心了
|
|
信号量的实现需要互斥锁和处理器原子指令的支持, 它是一种更高级的同步互斥机制。
|
|
信号量的起源和基本思路
Edsger Dijkstra
|
信号量 (Semphore)是一种变量或抽象数据类型,用于控制多个线程对共同资源的访问。
|
|
信号量是对互斥锁的一种巧妙的扩展
|
互斥锁的初始值一般设置为 1 的整型变量, 表示临界区还没有被某个线程占用
|
互斥锁用 0 表示临界区已经被占用了,再通过 lock/unlock 操作来协调多个线程轮流独占临界区执行。
|
|
而信号量的初始值可设置为 N 的整数变量,
|
如果 N 大于 0, 表示最多可以有 N 个线程进入临界区执行,
|
如果 N 小于等于 0 ,表示不能有线程进入临界区了,
|
必须在后续操作中让信号量的值加 1 ,
|
才能唤醒某个等待的线程。
|
|
Dijkstra 对信号量设计了两种操作:
P(Proberen(荷兰语),尝试)操作和
|
检查信号量的值是否大于 0,若该值大于 0,则将其值减 1 并继续(表示可以进入临界区了);
|
|
|
注意,此时 P 操作还未结束。
|
而且由于信号量本身是一种临界资源(可回想一下上一节的锁, 其实也是一种临界资源),
|
所以在 P 操作中,检查/修改信号量值以及可能发生的睡眠这一系列操作,
|
是一个不可分割的原子操作过程。
|
|
通过原子操作才能保证,一旦 P 操作开始,则在该操作完成或阻塞睡眠之前, 其他线程均不允许访问该信号量。
|
|
V(Verhogen(荷兰语),增加)操作。
|
V 操作会对信号量的值加 1 ,然后检查是否有一个或多个线程在该信号量上睡眠等待。
|
如有, 则选择其中的一个线程唤醒并允许该线程继续完成它的 P 操作;
|
如没有,则直接返回。
|
|
注意,信号量的值加 1, 并可能唤醒一个线程的一系列操作同样也是不可分割的原子操作过程。
|
不会有某个进程因执行 V 操作而阻塞。
|
|
|
|
如果信号量是一个任意的整数,通常被称为计数信号量(Counting Semaphore),或一般信号量(General Semaphore);
|
如果信号量只有0或1的取值,则称为二值信号量(Binary Semaphore)。
|
信号量很好地解决了最多允许 N 个线程访问临界资源的情况。
|
|
伪代码
1fn P(S) {
|
|
2 if S >= 1
|
S 的取值范围为大于等于 0 的整数。
|
S 的初值一般设置为一个大于 0 的正整数, 表示可以进入临界区的线程数。
|
当 S 取值为 1,表示是二值信号量,也就是互斥锁了。
|
|
3 S = S - 1;
|
|
4 else
|
|
5 ;
|
|
6}
|
|
7fn V(S) {
|
|
8 if
|
|
9 ;
|
|
10 else
|
|
11 S = S + 1;
|
|
12}
|
|
|
使用信号量
1let static mut S: semaphore = 1;
|
2
|
3// Thread i
|
4fn foo() {
|
5 ...
|
6 P(S);
|
7 execute Cricital Section;
|
8 V(S);
|
9 ...
|
10}
|
|
另外一种信号量实现的伪代码:
1fn P(S) {
|
|
2 S = S - 1;
|
S 的初值一般设置为一个大于 0 的正整数,
|
表示可以进入临界区的线程数。
|
但 S 的取值范围可以是小于 0 的整数
|
表示等待进入临界区的睡眠线程数。
|
|
3 if S < 0 then
|
|
4 ;
|
|
5}
|
|
6
|
|
7fn V(S) {
|
|
8 S = S + 1;
|
|
9 if
|
|
10 ;
|
|
11}
|
|
|
另一种用途是用于实现同步(synchronization)。
比如,把信号量的初始值设置为 0 ,
|
当一个线程 A 对此信号量执行一个 P 操作,
|
那么该线程立即会被阻塞睡眠。
|
之后有另外一个线程 B 对此信号量执行一个 V 操作,
|
就会将线程 A 唤醒。
|
|
这样线程 B 中执行 V 操作之前的代码序列 B-stmts 和线程 A 中执行 P 操作之后的代码 A-stmts 序列之间就形成了一种确定的同步执行关系,
|
即线程 B 的 B-stmts 会先执行,然后才是线程 A 的 A-stmts 开始执行。
|
|
|
伪代码
1let static mut S: semaphore = 0;
|
2
|
3//Thread A
|
4...
|
5P(S);
|
6Label_2:
|
7A-stmts after Thread B::Label_1;
|
8...
|
9
|
10//Thread B
|
11...
|
12B-stmts before Thread A::Label_2;
|
13Label_1:
|
14V(S);
|
15...
|
|
|
实现信号量
使用 semaphore 系统调用
主线程先创建了信号量初值为 0 的信号量 SEM_SYNC
|
然后再创建两个线程 First 和 Second
|
线程 First 会先睡眠 10ms,而当线程 Second 执行时,会由于执行信号量的 P 操作而等待睡眠;
|
当线程 First 醒来后,会执行 V 操作,从而能够唤醒线程 Second。
|
这样线程 First 和线程 Second 就形成了一种稳定的同步关系。
|
|
1const SEM_SYNC: usize = 0; //信号量ID
|
|
2unsafe fn first() -> ! {
|
|
3 sleep(10);
|
|
4 println!("First work and wakeup Second");
|
|
5 semaphore_up(SEM_SYNC); //信号量V操作
|
第 5 行,线程 First 执行信号量 V 操作
|
(对应第 25 行 SYSCALL_SEMAPHORE_UP 系统调用), 会唤醒等待该信号量的线程 Second。
|
|
6 exit(0)
|
|
7}
|
|
8unsafe fn second() -> ! {
|
|
9 println!("Second want to continue,but need to wait first");
|
|
10 semaphore_down(SEM_SYNC); //信号量P操作
|
线程 Second 执行信号量 P 操作
|
(对应第 28行 SYSCALL_SEMAPHORE_DOWN 系统调用),由于信号量初值为 0 ,该线程将阻塞;
|
|
11 println!("Second can work now");
|
|
12 exit(0)
|
|
13}
|
|
14pub fn main() -> i32 {
|
|
15 // create semaphores
|
|
16 assert_eq!(semaphore_create(0) as usize, SEM_SYNC); // 信号量初值为0
|
创建了一个初值为 0 ,ID 为 SEM_SYNC 的信号量,
|
对应的是第 22 行 SYSCALL_SEMAPHORE_CREATE 系统调用;
|
|
17 // create first, second threads
|
|
18 ...
|
|
19}
|
|
20
|
|
21pub fn sys_semaphore_create(res_count: usize) -> isize {
|
|
22 syscall(SYSCALL_SEMAPHORE_CREATE, [res_count, 0, 0])
|
|
23}
|
|
24pub fn sys_semaphore_up(sem_id: usize) -> isize {
|
|
25 syscall(SYSCALL_SEMAPHORE_UP, [sem_id, 0, 0])
|
|
26}
|
|
27pub fn sys_semaphore_down(sem_id: usize) -> isize {
|
|
28 syscall(SYSCALL_SEMAPHORE_DOWN, [sem_id, 0, 0])
|
|
29}
|
|
|
|
实现 semaphore 系统调用
1pub struct ProcessControlBlock {
|
|
2 // immutable
|
|
3 pub pid: PidHandle,
|
|
4 // mutable
|
|
5 inner: UPSafeCell,
|
|
6}
|
|
7pub struct ProcessControlBlockInner {
|
|
8 ...
|
|
9 pub semaphore_list: Vec |
在线程的眼里,信号量是一种每个线程能看到的共享资源,
|
且在一个进程中,可以存在多个不同信号量资源,
|
所以我们可以把所有的信号量资源放在一起让进程来管理,
|
semaphore_list: Vec |
|
10}
|
|
11
|
|
12pub struct Semaphore {
|
Semaphore 是信号量的内核数据结构,由信号量值和等待队列组成。
|
13 pub inner: UPSafeCell,
|
|
14}
|
|
15pub struct SemaphoreInner {
|
|
16 pub count: isize,
|
信号量的核心数据成员:信号量值和等待队列。
|
17 pub wait_queue: VecDeque>,
|
|
18}
|
|
19impl Semaphore {
|
|
20 pub fn new(res_count: usize) -> Self {
|
创建信号量,信号量初值为参数 res_count 。
|
21 Self {
|
|
22 inner: unsafe { UPSafeCell::new(
|
|
23 SemaphoreInner {
|
|
24 count: res_count as isize,
|
|
25 wait_queue: VecDeque::new(),
|
|
26 }
|
|
27 )},
|
|
28 }
|
|
29 }
|
|
30
|
|
31 pub fn up(&self) {
|
V 操作是由 Semaphore 的 up 方法实现
|
32 let mut inner = self.inner.exclusive_access();
|
|
33 inner.count += 1;
|
|
34 if inner.count <= 0 {
|
当信号量值小于等于 0 时, 将从信号量的等待队列中弹出一个线程放入线程就绪队列。
|
35 if let Some(task) = inner.wait_queue.pop_front() {
|
|
36 add_task(task);
|
|
37 }
|
|
38 }
|
|
39 }
|
|
40
|
|
41 pub fn down(&self) {
|
P 操作是由 Semaphore 的 down 方法实现
|
42 let mut inner = self.inner.exclusive_access();
|
|
43 inner.count -= 1;
|
|
44 if inner.count < 0 {
|
当信号量值小于 0 时, 将把当前线程放入信号量的等待队列,设置当前线程为挂起状态并选择新线程执行。
|
45 inner.wait_queue.push_back(current_task().unwrap());
|
|
46 drop(inner);
|
|
47 block_current_and_run_next();
|
|
48 }
|
|
49 }
|
|
50}
|
|
|
|
|
|
条件变量机制
为了简化编程、避免错误, 计算机科学家针对某些情况设计了一种更高层的同步互斥原语。
|
具体而言,在有些情况下, 线程需要检查某一条件(condition)满足之后,才会继续执行。
|
|
来看一个例子,
有两个线程 first 和 second 在运行,
|
线程 first 会把全局变量 A 设置为 1,
|
而线程 second 在 A != 0 的条件满足后,才能继续执行,
|
如果线程 second 先执行,会忙等在 while 循环中,在操作系统的调度下,线程 first 会执行并把 A 赋值为 1 后,然后线程 second 再次执行时,就会跳出 while 循环,进行接下来的工作。
|
配合互斥锁,可以正确完成上述带条件的同步流程,如下面的伪代码所示:
|
实现能执行,但效率低下,因为线程 second 会忙等检查,浪费处理器时间。
|
|
1static mut A: usize = 0;
|
2unsafe fn first() -> ! {
|
3 mutex.lock();
|
4 A=1;
|
5 mutex.unlock();
|
6 ...
|
7}
|
8
|
9unsafe fn second() -> ! {
|
10 mutex.lock();
|
11 while A==0 {
|
12 mutex.unlock();
|
13 // give other thread a chance to lock
|
14 mutex.lock();
|
15 };
|
16 mutex.unlock();
|
17 //继续执行相关事务
|
18}
|
|
我们希望有某种方式让线程 second 休眠,直到等待的条件满足,再继续执行。于是,我们可以写出如下的代码:
|
确实,线程 second 是带着上锁的 mutex 进入等待睡眠状态的。
|
如果这两个线程的调度顺序是先执行线程 second,再执行线程first,那么线程 second 会先睡眠且拥有 mutex 的锁;
|
当线程 first 执行时,会由于没有 mutex 的锁而进入等待锁的睡眠状态。
|
结果就是两个线程都睡了,都执行不下去,这就出现了 死锁 。
|
|
1static mut A: usize = 0;
|
2unsafe fn first() -> ! {
|
3 mutex.lock();
|
4 A=1;
|
5 wakup(second);
|
6 mutex.unlock();
|
7 ...
|
8}
|
9
|
10unsafe fn second() -> ! {
|
11 mutex.lock();
|
12 while A==0 {
|
13 wait();
|
14 };
|
15 mutex.unlock();
|
16 //继续执行相关事务
|
17}
|
|
这里需要解决的两个关键问题
如何等待一个条件?
|
在条件为真时如何向等待线程发出信号 。
|
|
|
|
我们的计算机科学家给出了 管程(Monitor) 和 条件变量(Condition Variables) 这种巧妙的方法。
|
|
|
|
条件变量的基本思路
管程Monitor有一个很重要的特性,
|
即任一时刻只能有一个活跃线程调用管程中的过程,
|
这一特性使线程在调用执行管程中过程时能保证互斥,
|
这样线程就可以放心地访问共享变量。
|
|
|
管程是编程语言的组成部分,
|
编译器知道其特殊性,
|
因此可以采用与其他过程调用不同的方法来处理对管程的调用.
|
因为是由编译器而非程序员来生成互斥相关的代码,所以出错的可能性要小。
|
|
|
管程虽然借助编译器提供了一种实现互斥的简便途径,但这还不够,还需要一种线程间的沟通机制。
|
|
首先是等待机制:
由于线程在调用管程中某个过程时,发现某个条件不满足,那就在无法继续运行而被阻塞。
|
|
|
其次是唤醒机制:
另外一个线程可以在调用管程的过程中,把某个条件设置为真,
|
并且还需要有一种机制, 及时唤醒等待条件为真的阻塞线程。
|
|
|
为了避免管程中同时有两个活跃线程,
|
|
我们需要一定的规则来约定线程发出唤醒操作的行为。
|
|
目前有三种典型的规则方案:
Hoare 语义:
|
线程发出唤醒操作后,马上阻塞自己,让新被唤醒的线程运行。
|
注:此时唤醒线程的执行位置还在管程中。
|
|
Hansen 语义:
|
执行唤醒操作的线程必须立即退出管程,
|
即唤醒操作只可能作为一个管程过程的最后一条语句。
|
注:此时唤醒线程的执行位置离开了管程。
|
|
Mesa 语义:
|
唤醒线程在发出唤醒操作后继续运行,
|
并且只有它退出管程之后,才允许等待的线程开始运行。
|
注:此时唤醒线程的执行位置还在管程中。
|
|
|
|
下面介绍一个基于 Mesa 语义的沟通机制。
|
这种沟通机制的具体实现就是 条件变量 和对应的操作:
|
|
|
当条件不满足时,线程通过执行条件变量的 wait 操作就可以把自己加入到等待队列中,睡眠等待(waiting)该条件。
|
|
另外某个线程,当它改变条件为真后,
|
就可以通过条件变量的 signal 操作来唤醒一个或者多个等待的线程(通过在该条件上发信号),让它们继续执行。
|
|
|
|
早期提出的管程是基于 Concurrent Pascal 来设计的
|
其他语言如 C 和 Rust 等,并没有在语言上支持这种机制。
|
我们还是可以用手动加入互斥锁的方式来代替编译器,就可以在 C 和 Rust 的基础上实现原始的管程机制了。
|
在目前的 C 语言应用开发中,实际上也是这么做的。
|
这样,我们就可以用互斥锁和条件变量, 来重现上述的同步互斥例子:
|
|
1static mut A: usize = 0;
|
2unsafe fn first() -> ! {
|
3 mutex.lock();
|
4 A=1;
|
5 condvar.wakup();
|
6 mutex.unlock();
|
7 ...
|
8}
|
9
|
10unsafe fn second() -> ! {
|
11 mutex.lock();
|
12 while A==0 {
|
13 condvar.wait(mutex); //在睡眠等待之前,需要释放mutex
|
14 };
|
15 mutex.unlock();
|
16 //继续执行相关事务
|
17}
|
|
实现
|
wait三步
1. 释放锁;
|
2. 把自己挂起;
|
3. 被唤醒后,再获取锁。
|
|
条件变量的 signal 操作只包含一步:
|
|
1fn wait(mutex) {
|
2 mutex.unlock();
|
3 ;
|
4 mutex.lock();
|
5}
|
6
|
7fn signal() {
|
8 ;
|
9}
|
|
注意,。
条件变量不像信号量那样有一个整型计数值的成员变量,
|
所以条件变量也不能像信号量那样有读写计数值的能力。
|
|
如果一个线程向一个条件变量发送唤醒操作,
|
但是在该条件变量上并没有等待的线程,
|
则唤醒操作实际上什么也没做
|
|
|
|
|
实现条件变量
使用 condvar 系统调用
|
面向应用程序对条件变量系统调用的简单使用
|
它的使用与上一节介绍的信号量系统调用类似
|
主线程先创建了初值为 1 的互斥锁和一个条件变量,
|
然后再创建两个线程 First 和 Second
|
|
线程 First 会先睡眠 10ms,
|
而当线程 Second 执行时,会由于条件不满足执行条件变量的 wait 操作而等待睡眠;
|
|
当线程 First 醒来后,通过设置 A 为 1,让线程 second 等待的条件满足,然后会执行条件变量的 signal 操作,从而能够唤醒线程 Second。
|
|
这样线程 First 和线程 Second 就形成了一种稳定的同步与互斥关系。
|
|
1static mut A: usize = 0; //全局变量
|
|
2
|
|
3const CONDVAR_ID: usize = 0;
|
|
4const MUTEX_ID: usize = 0;
|
|
5
|
|
6unsafe fn first() -> ! {
|
|
7 sleep(10);
|
|
8 println!("First work, Change A --> 1 and wakeup Second");
|
|
9 mutex_lock(MUTEX_ID);
|
|
10 A=1;
|
|
11 condvar_signal(CONDVAR_ID);
|
线程 First 执行条件变量 signal 操作
|
(对应第 36 行 SYSCALL_CONDVAR_SIGNAL 系统调用),
|
会唤醒等待该条件变量的线程 Second。
|
|
12 mutex_unlock(MUTEX_ID);
|
|
13 ...
|
|
14}
|
|
15unsafe fn second() -> ! {
|
|
16 println!("Second want to continue,but need to wait A=1");
|
|
17 mutex_lock(MUTEX_ID);
|
|
18 while A==0 {
|
|
19 condvar_wait(CONDVAR_ID, MUTEX_ID);
|
线程 Second 执行条件变量 wait 操作
|
(对应第 39 行 SYSCALL_CONDVAR_WAIT 系统调用)
|
该线程将释放 mutex 锁并阻塞;
|
|
20 }
|
|
21 mutex_unlock(MUTEX_ID);
|
|
22 ...
|
|
23}
|
|
24pub fn main() -> i32 {
|
|
25 // create condvar & mutex
|
|
26 assert_eq!(condvar_create() as usize, CONDVAR_ID);
|
创建了一个 ID 为 CONDVAR_ID 的条件量,
|
对应第 33 行 SYSCALL_CONDVAR_CREATE 系统调用;
|
|
27 assert_eq!(mutex_blocking_create() as usize, MUTEX_ID);
|
|
28 // create first, second threads
|
|
29 ...
|
|
30}
|
|
31
|
|
32pub fn condvar_create() -> isize {
|
|
33 sys_condvar_create(0)
|
|
34}
|
|
35pub fn condvar_signal(condvar_id: usize) {
|
|
36 sys_condvar_signal(condvar_id);
|
|
37}
|
|
38pub fn condvar_wait(condvar_id: usize, mutex_id: usize) {
|
|
39 sys_condvar_wait(condvar_id, mutex_id);
|
|
40}
|
|
|
|
实现 condvar 系统调用
|
1pub struct ProcessControlBlock {
|
|
2 // immutable
|
|
3 pub pid: PidHandle,
|
|
4 // mutable
|
|
5 inner: UPSafeCell,
|
|
6}
|
|
7pub struct ProcessControlBlockInner {
|
|
8 ...
|
|
9 pub condvar_list: Vec |
在线程的眼里,条件变量是一种每个线程能看到的共享资源,
|
且在一个进程中,可以存在多个不同条件变量资源,
|
所以我们可以把所有的条件变量资源放在一起让进程来管理
|
condvar_list: Vec |
|
10}
|
|
11pub struct Condvar {
|
而 Condvar 是条件变量的内核数据结构,由等待队列组成
|
12 pub inner: UPSafeCell,
|
|
13}
|
|
14pub struct CondvarInner {
|
|
15 pub wait_queue: VecDeque>,
|
|
16}
|
|
17impl Condvar {
|
|
18 pub fn new() -> Self {
|
|
19 Self {
|
|
20 inner: unsafe { UPSafeCell::new(
|
|
21 CondvarInner {
|
|
22 wait_queue: VecDeque::new(),
|
|
23 }
|
|
24 )},
|
|
25 }
|
|
26 }
|
|
27 pub fn signal(&self) {
|
实现 signal 操作,将从条件变量的等待队列中弹出一个线程放入线程就绪队列。
|
|
28 let mut inner = self.inner.exclusive_access();
|
|
29 if let Some(task) = inner.wait_queue.pop_front() {
|
|
30 wakeup_task(task);
|
|
31 }
|
|
32 }
|
|
33 pub fn wait(&self, mutex:Arc) {
|
实现 wait 操作,
|
释放 mutex 互斥锁,
|
将把当前线程放入条件变量的等待队列,
|
设置当前线程为挂起状态并选择新线程执行。
|
在恢复执行后,再加上 mutex 互斥锁。
|
|
34 mutex.unlock();
|
|
35 let mut inner = self.inner.exclusive_access();
|
|
36 inner.wait_queue.push_back(current_task().unwrap());
|
|
37 drop(inner);
|
|
38 block_current_and_run_next();
|
|
39 mutex.lock();
|
|
40 }
|
|
41}
|
|
|
|
|
|