C++ 多线程

  • C++ 多线程封装,以及一些并发处理机制的资料
  • 持续更新ing

参考链接

atomic

  • 所谓原子操作,就是多线程程序中“最小的且不可并行化的”操作。对于在多个线程间共享的一个资源而言,这意味着同一时刻,多个线程中有且仅有一个线程在对这个资源进行操作,即互斥访问。
  • 而关于原子性,我们应当具有一个基本的认知:高级语言层面,单条语句并不能保证对应的操作具有原子性

举例说明

  • 在使用 C、C++、Java 等各种高级语言编写代码时,不少人会下意识的认为一条不可再分的单条语句具有原子性,例如常见 i++。
// 伪码

int i = 0;

void increase() {
  i++;
}

int main() {
  /* 创建两个线程,每个线程循环进行 100 次 increase */
  // 线程 1
  Thread thread1 = new Thread(
    run() {
      for (int i = 0; i < 100; i++) increase();
    }
  );
  // 线程 2
  Thread thread2 = new Thread(
    run() {
      for (int i = 0; i < 100; i++) increase();
    }
  );
}
  • 如果 i++ 是原子操作,则上述伪码中的 i 最终结果为 200。但实际上每次运行结果可能都不相同,且通常小于 200。
  • 之所以出现这样的情况是因为 i++ 在执行时通常还会继续划分为多条 CPU 指令。以 Java 为例,i++ 编译将形成四条字节码指令,以下四条指令是 Java 虚拟机中的字节码指令,字节码指令是 JVM 执行的指令。实际每条字节码指令还可以继续划分为更底层的机器指令。但字节码指令已经足够演示原子性的含义了。如下所示:
// Java 字节码指令
0:  getstatic     # 获取静态属性指令,获取指定类的静态域,并将其值压入栈顶  
1:  iconst_1      # 当int取值-1~5时,JVM采用iconst指令将常量压入栈中,也就是将 1 压入栈中
2:  iadd          # 将栈顶两 int 型数值相加并将结果压入栈顶
3:  putstatic     # 为指定的类的静态域赋值
  • 而上述四条指令的执行并不保证原子性,即执行过程可被打断。考虑如下 CPU 执行序列:
    1. 线程 1 执行 getstatic 指令,获得 i = 1
    2. CPU 切换到线程 2,也执行了 getstatic 指令,获得 i = 1。
    3. CPU 切回线程 1 执行剩下指令,此时 i = 2
    4. CPU 切到线程 2,由于步骤 2 读到的是 i = 1,固执行剩下指令最终只会得到 i = 2

C++ std::atomic

  • std::atomic 为C++11封装的原子数据类型。原子数据类型不会发生数据竞争,能直接用在多线程中而不必我们用户对其进行添加互斥资源锁的类型。从实现上,大家可以理解为这些原子类型内部自己加了锁。
  • 原子类型在头文件 中,使用atomic有两套命名模式:一种是使用替代名称,一种是使用atomic的特化。
    • atomic_bool <=> atomic < bool >
    • atomic_char <=> atomic< char >
    • atomic_int <=> atomic< int >
    • ...
      20210513164522
  • 并非所有的类型都能提供原子操作,这是因为原子操作的可行性取决于 CPU 的架构以及所实例化的类型结构是否满足该架构对内存对齐条件的要求,因而我们总是可以通过 std::atomic::is_lock_free来检查该原子类型是否需支持原子操作。这个函数让用户可以查询某原子类型的操作是直接用的原子指令(x.is_lock_free()返回true), 还是编译器和库内部用了一个锁(x.is_lock_free()返回false)。
template < class T > struct atomic {
    bool is_lock_free() const volatile;
    bool is_lock_free() const;
    void store(T, memory_order = memory_order_seq_cst) volatile;
    void store(T, memory_order = memory_order_seq_cst);
    T load(memory_order = memory_order_seq_cst) const volatile;
    T load(memory_order = memory_order_seq_cst) const;
    operator  T() const volatile;
    operator  T() const;
    T exchange(T, memory_order = memory_order_seq_cst) volatile;
    T exchange(T, memory_order = memory_order_seq_cst);
    bool compare_exchange_weak(T &, T, memory_order, memory_order) volatile;
    bool compare_exchange_weak(T &, T, memory_order, memory_order);
    bool compare_exchange_strong(T &, T, memory_order, memory_order) volatile;
    bool compare_exchange_strong(T &, T, memory_order, memory_order);
    bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst) volatile;
    bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst);
    bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst) volatile;
    bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst);
    atomic() = default;
    constexpr atomic(T);
    atomic(const atomic &) = delete;
    atomic & operator=(const atomic &) = delete;
    atomic & operator=(const atomic &) volatile = delete;
    T operator=(T) volatile;
    T operator=(T);
};

主要操作

load()
  • 加载原子对象中存入的值,等价于直接使用原子变量。
T load (memory_order sync = memory_order_seq_cst) const volatile noexcept;
T load (memory_order sync = memory_order_seq_cst) const noexcept;
  • 读取被封装的值,参数 sync 设置内存序(Memory Order),可能的取值如下:
    • memory_order_relaxed
    • memory_order_consume
    • memory_order_acquire
    • memory_order_seq_cst
store()
  • 存储一个值到原子对象,等价于使用等号。
void store (T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
void store (T val, memory_order sync = memory_order_seq_cst) noexcept;
  • 修改被封装的值,std::atomic::store 函数将类型为 T 的参数 val 复制给原子对象所封装的值。T 是 std::atomic 类模板参数。另外参数 sync 指定内存序(Memory Order),可能的取值:
    • memory_order_relaxed
    • memory_order_release
    • memory_order_seq_cst
exchange() 原子操作
  • 返回原来里面存储的值,然后在存储一个新的值,相当于将上面两个 load() 和 store() 合成起来的参数。
T exchange (T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
T exchange (T val, memory_order sync = memory_order_seq_cst) noexcept;
  • 读取并修改被封装的值,exchange 会将 val 指定的值替换掉之前该原子对象封装的值,并返回之前该原子对象封装的值,整个过程是原子的(因此exchange 操作也称为 read-modify-write 操作)。sync参数指定内存序(Memory Order),可能的取值如下:
    • memory_order_relaxed
    • memory_order_consume
    • memory_order_acquire
    • memory_order_release
    • memory_order_acq_rel
    • memory_order_seq_cst
compare_exchange_weak()
  • 交换-比较操作是比较原子变量值和所提供的期望值,如果二者相等,则存储提供的期望值,如果不等则将期望值更新为原子变量的实际值,更新成功则返回true反之则返回false。
atomic<bool> b;
b.compare_exchange_weak(expected, new_value);
  • 当存储的值和expected相等时则将则更新为new_value,如果不等时则不变。其中expected必须是类型变量,而不能是常量。
compare_exchange_strong()
  • 不像compare_exchange_weak,此版本必须始终true在预期确实与所包含的对象相等时返回,不允许出现虚假故障。但是,在某些计算机上,对于某些在循环中进行检查的算法,compare_exchange_weak 可能会明显改善性能。
  • 其余使用方法和compare_exchange_weak完全一致。
  • 这两个版本的区别是:Weak版本如果数据符合条件被修改,其也可能返回false,就好像不符合修改状态一致;而Strong版本不会有这个问题,但在某些平台上Strong版本比Weak版本慢 [注:在x86平台我没发现他们之间有任何性能差距];绝大多数情况下,我们应该优先选择使用Strong版本;

memory order

  • 在介绍 Memory Order 之前首先介绍 有序性
有序性
  • 上述已经提到 CPU 的一条指令执行时,通常会有多个步骤,如取指IF 即从主存储器中取出指令、ID 译码即翻译指令、EX 执行指令、存储器访问 MEM 取数、WB 写回。
  • 即指令执行将经历:IF、ID、EX、MEM、WB 阶段。
  • 现在考虑 CPU 在执行一条又一条指令时该如何完成上述步骤?最容易想到并是顺序串行,指令 1 依次完成上述五个步骤,完成之后,指令 2 再开始依次完成上述步骤。这种方式简单直接,但执行效率显然存在很大的优化空间。
  • 思考一种流水线工作:
指令1   IF ID EX MEM WB
指令2      IF ID EX MEM WB
指令3         IF ID EX MEM WB
  • 采用这种流水线的工作方式,将避免 CPU 、存储器中各个器件的空闲,从而充分利用每个器件,提升性能。同时注意到由于每条指令执行的情况有所不同,指令执行的先后顺序将会影响到这条流水线的负载情况,而我们的目标则是让整个流水线满载紧凑的运行。
  • 为此 CPU 又实现了「指令重排」技术,CPU 将有选择性的对部分指令进行重排来提高 CPU 执行的性能和效率。例如:
x = 100;    // #1
y = 200;    // #2
z = x + y;  // #3
  • 虽然上述高级语言的语句会编译成多条机器指令,多条机器指令还会进行「指令重排」,#1 语句与 #2 语句完全有可能被 CPU 重新排序,所以程序实际运行时可能会先执行 y = 200; 然后再执行 x = 100;
  • 但另一方面,指令重排的前提是不会影响线程内程序的串行语义,CPU 在重排指令时必须保证线程内语义不变,例如:
x = 0; // #1
x = 1; // #2
y = x; // #3
  • 上述的 y 一定会按照正常的串行逻辑被赋值为 1。
  • 但不幸的是,CPU 只能保证线程内的串行语义。在多线程的视角下,「指令重排」造成的影响需要程序员自己关注。

// 公共资源
int x = 0;
int y = 0;
int z = 0;

Thread_1:             Thread_2:
x = 100;              while (y != 200);
y = 200;              print x
z = x + y;
  • 如果 CPU 不进行「乱序优化」执行,那么 y = 200 时,x 已经被赋值为 100,此时线程 2 输出 x = 200。但实际运行时,线程 1 可能先执行 y = 200,此时 x 还是初始值 0。线程 2 观察到 y = 200 后,退出循环,输出 x = 0;
memory order
  • C++ 提供了 std::atomic 类模板,以保证操作原子性。同时也提供了内存顺序模型 memory_order指定内存访问,以便提供有序性和可见性。
    • memory_order_relaxed: 只保证原子操作的原子性,不提供有序性的保证
    • memory_order_consume 当前线程中依赖于当前加载的该值的读或写不能被重排到此加载前
    • memory_order_acquire 在其影响的内存位置进行获得操作:当前线程中读或写不能被重排到此加载前
    • memory_order_release 当前线程中的读或写不能被重排到此存储后
    • memory_order_acq_rel 带此内存顺序的读修改写操作既是获得操作又是释放操作
    • memory_order_seq_cst 有此内存顺序的加载操作进行获得操作,存储操作进行释放操作,而读修改写操作进行获得操作和释放操作,再加上存在一个单独全序,其中所有线程以同一顺序观测到所有修改
  • 组合出四种顺序:
    • Relaxed ordering 宽松顺序:宽松顺序只保证原子变量的原子性(变量操作的机器指令不进行重排序),但无其他同步操作,不保证多线程的有序性。
    Thread1: 
    y.load(std::memory_order_relaxed);
    
    Thread2:
    y.store(h, std::memory_order_relaxed);
    
    • Release-Acquire ordering 释放获得顺序,store 使用 memory_order_release,load 使用 memory_order_acquire,CPU 将保证如下两点:
      • store 之前的语句不允许被重排序到 store 之后(例子中的 #1 和 #2 语句一定在 store 之前执行)
      • load 之后的语句不允许被重排序到 load 之前(例子中的 #3 和 #4 一定在 load 之后执行)
    std::atomic<std::string*> ptr;
    int data;
    
    void producer()
    {
        std::string* p  = new std::string("Hello"); // #1
        data = 42;  // #2
        ptr.store(p, std::memory_order_release);
    }
    
    void consumer()
    {
        std::string* p2;
        while (!(p2 = ptr.load(std::memory_order_acquire)))
            ;
        assert(*p2 == "Hello"); // 绝无问题 #3
        assert(data == 42); // 绝无问题 #4
    }
    
    int main()
    {
        std::thread t1(producer);
        std::thread t2(consumer);
        t1.join(); t2.join();
    }
    
    • 同时 CPU 将保证 store 之前的语句比 load 之后的语句「先行发生」,即先执行 #1、#2,然后执行 #3、#4。这实际上就意味着线程 1 中 store 之前的读写操作对线程 2 中 load 执行后是可见的。注意是所有操作都同步了,不管 #3 是否依赖了 #1 或 #2
    • 值得关注的是这种顺序模型在一些强顺序系统例如 x86、SPARC TSO、IBM 主框架上是自动进行的。但在另外一些系统如 ARM、Power PC 等需要额外指令来保障。
    • Release-Consume ordering 释放消费顺序:store 使用 memory_order_release,load 使用 memory_order_consume。其效果与 Release-Acquire ordering 释放获得顺序类似,唯一不同的是并不是所有操作都同步(不够高效),而是只对依赖操作进行同步,保证其有序性上例就是 #3 一定发生在 #1 之后,因为这两个操作依赖于 ptr。但不会保证 #4 一定发生在 #2 之后(注意「释放获得顺序」可以保证这一点)。
    std::atomic<std::string*> ptr;
    int data;
    
    void producer()
    {
        std::string* p  = new std::string("Hello"); // #1
        data = 42; // #2
        ptr.store(p, std::memory_order_release);
    }
    
    void consumer()
    {
        std::string* p2;
        while (!(p2 = ptr.load(std::memory_order_consume)))
            ;
        assert(*p2 == "Hello"); // #3 绝无出错: *p2 从 ptr 携带依赖
        assert(data == 42); // #4 可能也可能不会出错: data 不从 ptr 携带依赖
    }
    
    int main()
    {
        std::thread t1(producer);
        std::thread t2(consumer);
        t1.join(); t2.join();
    }
    
    • Sequential consistency 序列一致顺序: 「释放获得顺序」是对某一个变量进行同步,Sequential consistency 序列一致顺序则是对所有变量的所有操作都进行同步。store 和 load 都使用 memory_order_seq_cst,可以理解对每个变量都进行 Release-Acquire 操作。所以这也是最慢的一种顺序模型。