1. 并行基础std::thread 用于创建一个执行的线程实例所以它是一切并发编程的基础使用时需要包含 thread 头文件 它提供了很多基本的线程操作例如 get_id() 来获取所创建线程的线程 ID使用 join() 来加入一个线程等等例如123456789#include iostream#include threadintmain() {std::threadt([](){std::cout hello world. std::endl;});t.join();return0;}2. 互斥量与临界区我们在操作系统、亦或是数据库的相关知识中已经了解过了有关并发技术的基本知识mutex 就是其中的核心之一。 C11 引入了 mutex 相关的类其所有相关的函数都放在 mutex 头文件中。std::mutex 是 C11 中最基本的 mutex 类通过实例化 std::mutex 可以创建互斥量 而通过其成员函数 lock() 可以进行上锁unlock() 可以进行解锁。 但是在实际编写代码的过程中最好不去直接调用成员函数 因为调用成员函数就需要在每个临界区的出口处调用 unlock()当然还包括异常。 这时候 C11 还为互斥量提供了一个 RAII 语法的模板类 std::lock_guard。 RAII 在不失代码简洁性的同时很好的保证了代码的异常安全性。在 RAII 用法下对于临界区的互斥量的创建只需要在作用域的开始部分例如123456789101112131415161718#include iostream#include mutex#include threadintv 1;voidcritical_section(intchange_v) {staticstd::mutex mtx;std::lock_guardstd::mutex lock(mtx);// 执行竞争操作v change_v;// 离开此作用域后 mtx 会被释放}intmain() {std::threadt1(critical_section, 2), t2(critical_section, 3);t1.join();t2.join();std::cout v std::endl;return0;}由于 C 保证了所有栈对象在生命周期结束时会被销毁所以这样的代码也是异常安全的。 无论 critical_section() 正常返回、还是在中途抛出异常都会引发堆栈回退也就自动调用了 unlock()。而 std::unique_lock 则是相对于 std::lock_guard 出现的std::unique_lock 更加灵活 std::unique_lock 的对象会以独占所有权没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权 的方式管理 mutex 对象上的上锁和解锁的操作。所以在并发编程中推荐使用 std::unique_lock。std::lock_guard 不能显式的调用 lock 和 unlock 而 std::unique_lock 可以在声明后的任意位置调用 可以缩小锁的作用范围提供更高的并发度。如果你用到了条件变量 std::condition_variable::wait 则必须使用 std::unique_lock 作为参数。例如123456789101112131415161718192021222324#include iostream#include mutex#include threadintv 1;voidcritical_section(intchange_v) {staticstd::mutex mtx;std::unique_lockstd::mutex lock(mtx);// 执行竞争操作v change_v;std::cout v std::endl;// 将锁进行释放lock.unlock();// 在此期间任何人都可以抢夺 v 的持有权// 开始另一组竞争操作再次加锁lock.lock();v 1;std::cout v std::endl;}intmain() {std::threadt1(critical_section, 2), t2(critical_section, 3);t1.join();t2.join();return0;}3. 期物期物Future表现为 std::future它提供了一个访问异步操作结果的途径这句话很不好理解。 为了理解这个特性我们需要先理解一下在 C11 之前的多线程行为。试想如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务并返回我一个结果。 而这时候线程 A 可能正在忙其他的事情无暇顾及 B 的结果 所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果。在 C11 的 std::future 被引入之前通常的做法是 创建一个线程 A在线程 A 里启动任务 B当准备完毕后发送一个事件并将结果保存在全局变量中。 而主函数线程 A 里正在做其他的事情当需要结果的时候调用一个线程等待函数来获得执行的结果。而 C11 提供的 std::future 简化了这个流程可以用来获取异步任务的结果。 自然地我们很容易能够想象到把它作为一种简单的线程同步手段即屏障barrier。为了看一个例子我们这里额外使用 std::packaged_task它可以用来封装任何可以调用的目标从而用于实现异步的调用。 举例来说1234567891011121314151617#include iostream#include future#include threadintmain() {// 将一个返回值为7的 lambda 表达式封装到 task 中// std::packaged_task 的模板参数为要封装函数的类型std::packaged_taskint() task([](){return7;});// 获得 task 的期物std::futureint result task.get_future();// 在一个线程中执行 taskstd::thread(std::move(task)).detach();std::cout waiting...;result.wait();// 在此设置屏障阻塞到期物的完成// 输出执行结果std::cout done! std:: endl future result is result.get() std::endl;return0;}在封装好要调用的目标后可以使用 get_future() 来获得一个 std::future 对象以便之后实施线程同步。4. 条件变量条件变量 std::condition_variable 是为了解决死锁而生当互斥操作不够用而引入的。 比如线程可能需要等待某个条件为真才能继续执行 而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时就会发生死锁。 所以condition_variable 实例被创建出现主要就是用于唤醒等待线程从而避免死锁。 std::condition_variable的 notify_one() 用于唤醒一个线程 notify_all() 则是通知所有线程。下面是一个生产者和消费者模型的例子1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253#include queue#include chrono#include mutex#include thread#include iostream#include condition_variableintmain() {std::queueint produced_nums;std::mutex mtx;std::condition_variable cv;boolnotified false;// 通知信号// 生产者auto producer []() {for(inti 0; ; i) {std::this_thread::sleep_for(std::chrono::milliseconds(900));std::unique_lockstd::mutex lock(mtx);std::cout producing i std::endl;produced_nums.push(i);notified true;cv.notify_all();// 此处也可以使用 notify_one}};// 消费者auto consumer []() {while(true) {std::unique_lockstd::mutex lock(mtx);while(!notified) {// 避免虚假唤醒cv.wait(lock);}// 短暂取消锁使得生产者有机会在消费者消费空前继续生产lock.unlock();// 消费者慢于生产者std::this_thread::sleep_for(std::chrono::milliseconds(1000));lock.lock();while(!produced_nums.empty()) {std::cout consuming produced_nums.front() std::endl;produced_nums.pop();}notified false;}};// 分别在不同的线程中运行std::threadp(producer);std::threadcs[2];for(inti 0; i 2; i) {cs[i] std::thread(consumer);}p.join();for(inti 0; i 2; i) {cs[i].join();}return0;}值得一提的是在生产者中我们虽然可以使用 notify_one()但实际上并不建议在此处使用 因为在多消费者的情况下我们的消费者实现中简单放弃了锁的持有这使得可能让其他消费者 争夺此锁从而更好的利用多个消费者之间的并发。话虽如此但实际上因为 std::mutex 的排他性 我们根本无法期待多个消费者能真正意义上的并行消费队列的中生产的内容我们仍需要粒度更细的手段。5. 原子操作与内存模型细心的读者可能会对前一小节中生产者消费者模型的例子可能存在编译器优化导致程序出错的情况产生疑惑。 例如布尔值 notified 没有被 volatile 修饰编译器可能对此变量存在优化例如将其作为一个寄存器的值 从而导致消费者线程永远无法观察到此值的变化。这是一个好问题为了解释清楚这个问题我们需要进一步讨论 从 C 11 起引入的内存模型这一概念。我们首先来看一个问题下面这段代码输出结果是多少123456789101112131415161718#include thread#include iostreamintmain() {inta 0;intflag 0;std::threadt1([]() {while(flag ! 1);intb a;std::cout b b std::endl;});std::threadt2([]() {a 5;flag 1;});t1.join();t2.join();return0;}从直观上看t2 中 a 5; 这一条语句似乎总在 flag 1; 之前得到执行而 t1 中 while (flag ! 1) 似乎保证了 std::cout b b std::endl; 不会再标记被改变前执行。从逻辑上看似乎 b 的值应该等于 5。 但实际情况远比此复杂得多或者说这段代码本身属于未定义的行为因为对于 a 和 flag 而言他们在两个并行的线程中被读写 出现了竞争。除此之外即便我们忽略竞争读写仍然可能受 CPU 的乱序执行编译器对指令的重排的影响 导致 a 5 发生在 flag 1 之后。从而 b 可能输出 0。5.1原子操作std::mutex 可以解决上面出现的并发读写的问题但互斥锁是操作系统级的功能 这是因为一个互斥锁的实现通常包含两条基本原理提供线程间自动的状态转换即『锁住』这个状态保障在互斥锁操作期间所操作变量的内存与临界区外进行隔离这是一组非常强的同步条件换句话说当最终编译为 CPU 指令时会表现为非常多的指令我们之后再来看如何实现一个简单的互斥锁。 这对于一个仅需原子级操作没有中间态的变量似乎太苛刻了。关于同步条件的研究有着非常久远的历史我们在这里不进行赘述。读者应该明白现代 CPU 体系结构提供了 CPU 指令级的原子操作 因此在 C11 中多线程下共享变量的读写这一问题上还引入了 std::atomic 模板使得我们实例化一个原子类型将一个 原子类型读写操作从一组指令最小化到单个 CPU 指令。例如1std::atomicint counter;并为整数或浮点数的原子类型提供了基本的数值成员函数举例来说 包括 fetch_add, fetch_sub 等同时通过重载方便的提供了对应的 - 版本。 比如下面的例子1234567891011121314151617#include atomic#include thread#include iostreamstd::atomicint count {0};intmain() {std::threadt1([](){count.fetch_add(1);});std::threadt2([](){count;// 等价于 fetch_addcount 1;// 等价于 fetch_add});t1.join();t2.join();std::cout count std::endl;return0;}当然并非所有的类型都能提供原子操作这是因为原子操作的可行性取决于具体的 CPU 架构以及所实例化的类型结构是否能够满足该 CPU 架构对内存对齐 条件的要求因而我们总是可以通过 std::atomicT::is_lock_free 来检查该原子类型是否需支持原子操作例如123456789101112#include atomic#include iostreamstructA {floatx;inty;longlongz;};intmain() {std::atomicA a;std::cout std::boolalpha a.is_lock_free() std::endl;return0;}5.2一致性模型并行执行的多个线程从某种宏观层面上讨论可以粗略的视为一种分布式系统。 在分布式系统中任何通信乃至本地操作都需要消耗一定时间甚至出现不可靠的通信。如果我们强行将一个变量 v 在多个线程之间的操作设为原子操作即任何一个线程在操作完 v 后 其他线程均能同步感知到 v 的变化则对于变量 v 而言表现为顺序执行的程序它并没有由于引入多线程 而得到任何效率上的收益。对此有什么办法能够适当的加速呢答案便是削弱原子操作的在进程间的同步条件。从原理上看每个线程可以对应为一个集群节点而线程间的通信也几乎等价于集群节点间的通信。 削弱进程间的同步条件通常我们会考虑四种不同的一致性模型线性一致性又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据并且所有线程的操作顺序与全局时钟下的顺序是一致的。x.store(1) x.load()T1 -------------------------------T2 --------------------------------x.store(2)在这种情况下线程 T1, T2 对 x 的两次写操作是原子的且 x.store(1) 是严格的发生在 x.store(2) 之前x.store(2) 严格的发生在 x.load() 之前。 值得一提的是线性一致性对全局时钟的要求是难以实现的这也是人们不断研究比这个一致性更弱条件下其他一致性的算法的原因。顺序一致性同样要求任何一次读操作都能读到数据最近一次写入的数据但未要求与全局时钟的顺序一致。x.store(1) x.store(3) x.load()T1 -----------------------------------T2 -------------------------------------x.store(2)或者x.store(1) x.store(3) x.load()T1 -----------------------------------T2 -------------------------------------x.store(2)在顺序一致性的要求下x.load() 必须读到最近一次写入的数据因此 x.store(2) 与 x.store(1) 并无任何先后保障即 只要 T2 的 x.store(2) 发生在 x.store(3) 之前即可。
C++中的并行与并发基础与使用详解
1. 并行基础std::thread 用于创建一个执行的线程实例所以它是一切并发编程的基础使用时需要包含 thread 头文件 它提供了很多基本的线程操作例如 get_id() 来获取所创建线程的线程 ID使用 join() 来加入一个线程等等例如123456789#include iostream#include threadintmain() {std::threadt([](){std::cout hello world. std::endl;});t.join();return0;}2. 互斥量与临界区我们在操作系统、亦或是数据库的相关知识中已经了解过了有关并发技术的基本知识mutex 就是其中的核心之一。 C11 引入了 mutex 相关的类其所有相关的函数都放在 mutex 头文件中。std::mutex 是 C11 中最基本的 mutex 类通过实例化 std::mutex 可以创建互斥量 而通过其成员函数 lock() 可以进行上锁unlock() 可以进行解锁。 但是在实际编写代码的过程中最好不去直接调用成员函数 因为调用成员函数就需要在每个临界区的出口处调用 unlock()当然还包括异常。 这时候 C11 还为互斥量提供了一个 RAII 语法的模板类 std::lock_guard。 RAII 在不失代码简洁性的同时很好的保证了代码的异常安全性。在 RAII 用法下对于临界区的互斥量的创建只需要在作用域的开始部分例如123456789101112131415161718#include iostream#include mutex#include threadintv 1;voidcritical_section(intchange_v) {staticstd::mutex mtx;std::lock_guardstd::mutex lock(mtx);// 执行竞争操作v change_v;// 离开此作用域后 mtx 会被释放}intmain() {std::threadt1(critical_section, 2), t2(critical_section, 3);t1.join();t2.join();std::cout v std::endl;return0;}由于 C 保证了所有栈对象在生命周期结束时会被销毁所以这样的代码也是异常安全的。 无论 critical_section() 正常返回、还是在中途抛出异常都会引发堆栈回退也就自动调用了 unlock()。而 std::unique_lock 则是相对于 std::lock_guard 出现的std::unique_lock 更加灵活 std::unique_lock 的对象会以独占所有权没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权 的方式管理 mutex 对象上的上锁和解锁的操作。所以在并发编程中推荐使用 std::unique_lock。std::lock_guard 不能显式的调用 lock 和 unlock 而 std::unique_lock 可以在声明后的任意位置调用 可以缩小锁的作用范围提供更高的并发度。如果你用到了条件变量 std::condition_variable::wait 则必须使用 std::unique_lock 作为参数。例如123456789101112131415161718192021222324#include iostream#include mutex#include threadintv 1;voidcritical_section(intchange_v) {staticstd::mutex mtx;std::unique_lockstd::mutex lock(mtx);// 执行竞争操作v change_v;std::cout v std::endl;// 将锁进行释放lock.unlock();// 在此期间任何人都可以抢夺 v 的持有权// 开始另一组竞争操作再次加锁lock.lock();v 1;std::cout v std::endl;}intmain() {std::threadt1(critical_section, 2), t2(critical_section, 3);t1.join();t2.join();return0;}3. 期物期物Future表现为 std::future它提供了一个访问异步操作结果的途径这句话很不好理解。 为了理解这个特性我们需要先理解一下在 C11 之前的多线程行为。试想如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务并返回我一个结果。 而这时候线程 A 可能正在忙其他的事情无暇顾及 B 的结果 所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果。在 C11 的 std::future 被引入之前通常的做法是 创建一个线程 A在线程 A 里启动任务 B当准备完毕后发送一个事件并将结果保存在全局变量中。 而主函数线程 A 里正在做其他的事情当需要结果的时候调用一个线程等待函数来获得执行的结果。而 C11 提供的 std::future 简化了这个流程可以用来获取异步任务的结果。 自然地我们很容易能够想象到把它作为一种简单的线程同步手段即屏障barrier。为了看一个例子我们这里额外使用 std::packaged_task它可以用来封装任何可以调用的目标从而用于实现异步的调用。 举例来说1234567891011121314151617#include iostream#include future#include threadintmain() {// 将一个返回值为7的 lambda 表达式封装到 task 中// std::packaged_task 的模板参数为要封装函数的类型std::packaged_taskint() task([](){return7;});// 获得 task 的期物std::futureint result task.get_future();// 在一个线程中执行 taskstd::thread(std::move(task)).detach();std::cout waiting...;result.wait();// 在此设置屏障阻塞到期物的完成// 输出执行结果std::cout done! std:: endl future result is result.get() std::endl;return0;}在封装好要调用的目标后可以使用 get_future() 来获得一个 std::future 对象以便之后实施线程同步。4. 条件变量条件变量 std::condition_variable 是为了解决死锁而生当互斥操作不够用而引入的。 比如线程可能需要等待某个条件为真才能继续执行 而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时就会发生死锁。 所以condition_variable 实例被创建出现主要就是用于唤醒等待线程从而避免死锁。 std::condition_variable的 notify_one() 用于唤醒一个线程 notify_all() 则是通知所有线程。下面是一个生产者和消费者模型的例子1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253#include queue#include chrono#include mutex#include thread#include iostream#include condition_variableintmain() {std::queueint produced_nums;std::mutex mtx;std::condition_variable cv;boolnotified false;// 通知信号// 生产者auto producer []() {for(inti 0; ; i) {std::this_thread::sleep_for(std::chrono::milliseconds(900));std::unique_lockstd::mutex lock(mtx);std::cout producing i std::endl;produced_nums.push(i);notified true;cv.notify_all();// 此处也可以使用 notify_one}};// 消费者auto consumer []() {while(true) {std::unique_lockstd::mutex lock(mtx);while(!notified) {// 避免虚假唤醒cv.wait(lock);}// 短暂取消锁使得生产者有机会在消费者消费空前继续生产lock.unlock();// 消费者慢于生产者std::this_thread::sleep_for(std::chrono::milliseconds(1000));lock.lock();while(!produced_nums.empty()) {std::cout consuming produced_nums.front() std::endl;produced_nums.pop();}notified false;}};// 分别在不同的线程中运行std::threadp(producer);std::threadcs[2];for(inti 0; i 2; i) {cs[i] std::thread(consumer);}p.join();for(inti 0; i 2; i) {cs[i].join();}return0;}值得一提的是在生产者中我们虽然可以使用 notify_one()但实际上并不建议在此处使用 因为在多消费者的情况下我们的消费者实现中简单放弃了锁的持有这使得可能让其他消费者 争夺此锁从而更好的利用多个消费者之间的并发。话虽如此但实际上因为 std::mutex 的排他性 我们根本无法期待多个消费者能真正意义上的并行消费队列的中生产的内容我们仍需要粒度更细的手段。5. 原子操作与内存模型细心的读者可能会对前一小节中生产者消费者模型的例子可能存在编译器优化导致程序出错的情况产生疑惑。 例如布尔值 notified 没有被 volatile 修饰编译器可能对此变量存在优化例如将其作为一个寄存器的值 从而导致消费者线程永远无法观察到此值的变化。这是一个好问题为了解释清楚这个问题我们需要进一步讨论 从 C 11 起引入的内存模型这一概念。我们首先来看一个问题下面这段代码输出结果是多少123456789101112131415161718#include thread#include iostreamintmain() {inta 0;intflag 0;std::threadt1([]() {while(flag ! 1);intb a;std::cout b b std::endl;});std::threadt2([]() {a 5;flag 1;});t1.join();t2.join();return0;}从直观上看t2 中 a 5; 这一条语句似乎总在 flag 1; 之前得到执行而 t1 中 while (flag ! 1) 似乎保证了 std::cout b b std::endl; 不会再标记被改变前执行。从逻辑上看似乎 b 的值应该等于 5。 但实际情况远比此复杂得多或者说这段代码本身属于未定义的行为因为对于 a 和 flag 而言他们在两个并行的线程中被读写 出现了竞争。除此之外即便我们忽略竞争读写仍然可能受 CPU 的乱序执行编译器对指令的重排的影响 导致 a 5 发生在 flag 1 之后。从而 b 可能输出 0。5.1原子操作std::mutex 可以解决上面出现的并发读写的问题但互斥锁是操作系统级的功能 这是因为一个互斥锁的实现通常包含两条基本原理提供线程间自动的状态转换即『锁住』这个状态保障在互斥锁操作期间所操作变量的内存与临界区外进行隔离这是一组非常强的同步条件换句话说当最终编译为 CPU 指令时会表现为非常多的指令我们之后再来看如何实现一个简单的互斥锁。 这对于一个仅需原子级操作没有中间态的变量似乎太苛刻了。关于同步条件的研究有着非常久远的历史我们在这里不进行赘述。读者应该明白现代 CPU 体系结构提供了 CPU 指令级的原子操作 因此在 C11 中多线程下共享变量的读写这一问题上还引入了 std::atomic 模板使得我们实例化一个原子类型将一个 原子类型读写操作从一组指令最小化到单个 CPU 指令。例如1std::atomicint counter;并为整数或浮点数的原子类型提供了基本的数值成员函数举例来说 包括 fetch_add, fetch_sub 等同时通过重载方便的提供了对应的 - 版本。 比如下面的例子1234567891011121314151617#include atomic#include thread#include iostreamstd::atomicint count {0};intmain() {std::threadt1([](){count.fetch_add(1);});std::threadt2([](){count;// 等价于 fetch_addcount 1;// 等价于 fetch_add});t1.join();t2.join();std::cout count std::endl;return0;}当然并非所有的类型都能提供原子操作这是因为原子操作的可行性取决于具体的 CPU 架构以及所实例化的类型结构是否能够满足该 CPU 架构对内存对齐 条件的要求因而我们总是可以通过 std::atomicT::is_lock_free 来检查该原子类型是否需支持原子操作例如123456789101112#include atomic#include iostreamstructA {floatx;inty;longlongz;};intmain() {std::atomicA a;std::cout std::boolalpha a.is_lock_free() std::endl;return0;}5.2一致性模型并行执行的多个线程从某种宏观层面上讨论可以粗略的视为一种分布式系统。 在分布式系统中任何通信乃至本地操作都需要消耗一定时间甚至出现不可靠的通信。如果我们强行将一个变量 v 在多个线程之间的操作设为原子操作即任何一个线程在操作完 v 后 其他线程均能同步感知到 v 的变化则对于变量 v 而言表现为顺序执行的程序它并没有由于引入多线程 而得到任何效率上的收益。对此有什么办法能够适当的加速呢答案便是削弱原子操作的在进程间的同步条件。从原理上看每个线程可以对应为一个集群节点而线程间的通信也几乎等价于集群节点间的通信。 削弱进程间的同步条件通常我们会考虑四种不同的一致性模型线性一致性又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据并且所有线程的操作顺序与全局时钟下的顺序是一致的。x.store(1) x.load()T1 -------------------------------T2 --------------------------------x.store(2)在这种情况下线程 T1, T2 对 x 的两次写操作是原子的且 x.store(1) 是严格的发生在 x.store(2) 之前x.store(2) 严格的发生在 x.load() 之前。 值得一提的是线性一致性对全局时钟的要求是难以实现的这也是人们不断研究比这个一致性更弱条件下其他一致性的算法的原因。顺序一致性同样要求任何一次读操作都能读到数据最近一次写入的数据但未要求与全局时钟的顺序一致。x.store(1) x.store(3) x.load()T1 -----------------------------------T2 -------------------------------------x.store(2)或者x.store(1) x.store(3) x.load()T1 -----------------------------------T2 -------------------------------------x.store(2)在顺序一致性的要求下x.load() 必须读到最近一次写入的数据因此 x.store(2) 与 x.store(1) 并无任何先后保障即 只要 T2 的 x.store(2) 发生在 x.store(3) 之前即可。