• 编程中的锁。
  • 威斯康辛州大学操作系统书籍《Operating Systems: Three Easy Pieces》读书笔记系列之 Concurrency(并发)。本篇为并发技术的基础篇系列第二篇(Locks),锁。

Before

  • 开始之前先简单阐述背景,最近项目测试发现在对于底层对象存储的并发访问过程中由于未进行并发控制而产生了一些问题,所以想趁此机会回顾“锁”的相关基础知识,并在此总结,看看能不能在总结的过程中顺便实现项目里的并发访问控制。
  • 还是和往常一样,会贴很多链接,只是尝试着吸收网络上已经存在的优秀的教程,然后再进行一下转化(讲道理确实没啥新意,但主要是为了之后查阅复习起来比较方便)

参考链接

  • 先贴参考链接主要是为了膜拜一下,并以表尊敬,因为绝大数情况下,参考链接里的文献就已经写的很好了,我的下文就其实没啥营养了大概,主要是为了节省大家的时间,看完早点去嗨皮~
  • 参考链接也会持续地更新补充,文章的思路将主要沿袭教科书 Three Easy Pieces(其实我是偷懒顺便把书看了~)

Locks

  • 并发编程中的一个基本问题:我们希望原子地执行一系列指令,但由于单个处理器上出现中断(或多个线程同时在多个处理器上执行),我们无法执行。我们通过引入所谓的锁来直接解决这个问题。程序员用锁注释源代码,把它们放在临界区周围,从而确保任何这样的临界区都像执行单个原子指令一样执行。

Locks: The Basic Idea

  • 作为一个例子,假设我们的临界区是这样的,共享变量的规范更新:
balance = balance + 1;
  • 当然,也可能有其他关键部分,比如向链表添加元素,或者对共享结构进行其他更复杂的更新,但我们现在只讨论这个简单的示例。为了使用锁,我们在临界区周围添加如下代码:
1 lock_t mutex; // some globally-allocated lock ’mutex’
2 ...
3 lock(&mutex);
4 balance = balance + 1;
5 unlock(&mutex);
  • 锁只是一个变量,因此要使用一个锁,必须声明某种类型的锁变量(比如上面的互斥锁)。这个锁变量(或直接简称“lock”)在任何时刻保持锁的状态。它是可用的(或未锁定的或空闲的),因此没有线程持有该锁,也没有线程获得该锁(或锁定或持有),因此只有一个线程持有该锁,并且可能处于临界区。我们也可以存储一些额外的信息在数据类型里,例如哪个线程持有当前锁。或者一个预定锁获取的队列,但是这样的信息对锁的使用者是隐藏的。
  • lock() 和 unlock() 例程的语义很简单。调用例程lock()试图获取锁;如果没有其他线程持有这个锁(即,它是空闲的),这个线程将获得这个锁并进入临界区;这个线程有时被称为锁的所有者。如果另一个线程在这个锁变量上调用lock()(在本例中是互斥),当锁被另一个线程持有时,它不会返回;这样,当持有锁的第一个线程还在临界区中时,其他线程就无法进入临界区。
  • 一旦锁的所有者调用unlock(),锁现在就可用了。如果没有其他线程在等待锁(也就是说,没有其他线程调用了lock()并被困在锁中),锁的状态就会被更改为free。如果有正在等待的线程(卡在lock()中),其中一个将(最终)通知(或被告知)锁状态的改变,获取锁,并进入临界区
  • 锁为程序员提供了对调度的最低限度的控制。通常,我们将线程视为程序员创建但由操作系统调度的实体,以操作系统选择的任何方式。锁将部分控制权交还给程序员;通过在一段代码周围加一个锁,程序员可以保证在该代码中不会有超过一个线程处于活动状态。因此,锁有助于将传统操作系统调度的混乱转变为一种更可控的活动

Pthread Locks

  • POSIX库用于锁的名称是互斥锁,因为它用于提供线程之间的互斥,例如,如果一个线程处于临界区,它将排除其他线程进入,直到它完成临界区。因此,当你看到下面的POSIX线程代码时,你应该明白它正在做和上面一样的事情(我们再次使用我们的包装器在锁定和解锁时检查错误):
1 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
2
3 Pthread_mutex_lock(&lock); // wrapper; exits on failure
4 balance = balance + 1;
5 Pthread_mutex_unlock(&lock);

// Keeps code clean; only use if exit() OK upon failure
void Pthread_mutex_lock(pthread_mutex_t *mutex) {
  int rc = pthread_mutex_lock(mutex);
  assert(rc == 0);
}
  • 您可能还注意到,POSIX版本将一个变量传递给lock和unlock,因为我们可能使用不同的锁来保护不同的变量。这样做可以增加并发性:与在访问任何关键部分时使用一个大锁(一种粗粒度的锁定策略)不同,通常使用不同的锁来保护不同的数据和数据结构,从而允许更多的线程同时处于锁定代码中(更细粒度的方法)。

Building A Lock

  • 我们如何建立一个高效的锁?高效的锁以较低的成本提供互斥,还可能获得我们下面讨论的一些其他属性。需要什么硬件支持?操作系统支持什么?
  • 为了构建一个有效的锁,我们需要我们的老朋友——硬件,以及我们的好朋友——操作系统的帮助。多年来,许多不同的硬件原语被添加到各种计算机体系结构的指令集中;虽然我们不会研究这些指令是如何实现的(毕竟,这是计算机架构类的主题),但我们将研究如何使用它们来构建像锁一样的互斥原语。我们还将研究操作系统是如何参与进来的,从而使我们能够构建一个复杂的锁库。

Evaluating Locks

  • 在构建任何锁之前,我们应该首先理解我们的目标是什么,因此我们应该询问如何评估特定锁实现的有效性。要评价锁是否有效(以及是否有效),我们应该建立一些基本的标准。第一个问题是锁是否执行它的基本任务,即提供互斥。基本上,锁是否有效,防止多个线程进入一个临界区?
  • 第二是公平。是否每个争用锁的线程都有机会在锁空闲时获取它?另一种考虑这个问题的方法是检查更极端的情况:是否有任何争用锁的线程在争用锁的时候饿死,从而永远得不到锁?
  • 最后一个标准是性能,特别是使用锁所增加的时间开销。这里有几个不同的情况值得考虑。一种是没有争用的情况;当一个线程正在运行并获取和释放锁时,这样做的开销是什么?另一种情况是多个线程争夺单个CPU上的锁;在这种情况下,是否存在性能问题?最后,当涉及多个cpu,并且每个cpu上的线程都争用该锁时,该锁是如何执行的?通过比较这些不同的场景,我们可以更好地理解使用各种锁定技术对性能的影响,如下所述。

Controlling Interrupts

  • 最早用于提供互斥的解决方案之一是禁用临界区中断;这种解决方案是为单处理器系统而发明的。代码如下所示:
1 void lock() {
2   DisableInterrupts();
3 }
4 void unlock() {
5   EnableInterrupts();
6 }
  • 假设我们运行在这样一个单处理器系统上。通过在进入临界区之前关闭中断(使用某种特殊的硬件指令),我们可以确保临界区内的代码不会被中断,从而可以像原子一样执行。当我们完成时,我们重新启用中断(同样是通过硬件指令),这样程序就像往常一样继续运行。这种方法的主要优点是简单。你当然不需要绞尽脑汁来弄明白为什么这是可行的。在没有中断的情况下,线程可以确保它执行的代码将被执行,并且不会有其他线程干扰它。
  • 不幸的是,负面的东西很多。
    • 首先,这种方法要求我们允许任何调用线程执行特权操作(打开和关闭中断),因此相信这种功能不会被滥用。正如你已经知道的,任何时候我们被要求信任一个任意的程序,我们可能会遇到麻烦。在这里,问题表现在许多方面:贪婪的程序可能会在执行开始时调用lock(),从而垄断处理器;更糟糕的是,一个错误的或恶意的程序可能会调用lock()并进入一个无穷循环。在后一种情况下,操作系统永远无法恢复对系统的控制,只有一种方法:重启系统。使用中断禁用作为通用的同步解决方案需要对应用程序有太多的信任。
    • 第二,这种方法不能在多处理器上工作。如果多个线程运行在不同的cpu上,并且每个线程都试图进入相同的临界区,那么是否禁用中断是无关紧要的;线程可以在其他处理器上运行,因此可以进入临界区。由于多处理器现在很普遍,我们的一般解决方案必须做得比这更好。
    • 第三,长时间关闭中断可能会导致中断丢失,从而导致严重的系统问题。例如,想象一下,如果CPU没有注意到磁盘设备已经完成了一个读请求。操作系统如何知道要唤醒等待读取的进程?
    • 最后,也可能是最不重要的,这种方法可能效率低下。与普通的指令执行相比,屏蔽或取消中断的代码在现代cpu中执行得比较慢。
  • 由于这些原因,关闭中断只在有限的上下文中用作互斥原语。例如,在某些情况下,操作系统本身将使用中断屏蔽来保证访问自己的数据结构时的原子性,或者至少防止出现某些混乱的中断处理情况。这种用法是有意义的,因为信任问题在操作系统内部消失了,操作系统无论如何总是信任自己执行特权操作。

A Failed Attempt: Just Using Loads/Stores

  • 要超越基于中断的技术,我们将不得不依赖CPU硬件以及它提供给我们的构建正确锁的指令。让我们首先尝试通过使用单个标记变量来构建一个简单的锁。在这次失败的尝试中,我们将了解构建锁所需的一些基本思想,并(希望如此)了解为什么仅使用单个变量并通过正常的 load 和 store 访问它是不够的。
  • 在如下第一次尝试中,想法非常简单:使用一个简单的变量(标志)来指示某个线程是否拥有锁。进入临界区的第一个线程将调用lock(),它将测试该标志是否等于1(在本例中,它不是),然后将该标志设置为1,以表明该线程现在持有锁。当临界区结束时,线程调用unlock()并清除标志,从而表明锁不再被持有。
  • 如果另一个线程碰巧在第一个线程处于临界区时调用了lock(),它将在while循环中简单地spin-wait,以便该线程调用unlock()并清除标志。一旦第一个线程这样做了,等待的线程将退出while循环,为自己设置标志为1,并继续进入临界区。
1 typedef struct __lock_t { int flag; } lock_t;
2
3 void init(lock_t *mutex) {
4   // 0 -> lock is available, 1 -> held
5   mutex->flag = 0;
6 }
7
8 void lock(lock_t *mutex) {
9   while (mutex->flag == 1) // TEST the flag
10      ; // spin-wait (do nothing)
11  mutex->flag = 1; // now SET it!
12 }
13
14 void unlock(lock_t *mutex) {
15  mutex->flag = 0;
16 }

  • 不幸的是,代码有两个问题:一个是正确性,另一个是性能。一旦您习惯了并行编程,正确性问题就很容易看到。想象一下如果代码交错;假设flag=0开始。
    20210401172110
  • 正如您从这个交错中看到的,通过及时(不及时?)中断,我们可以很容易地产生这样一种情况,即两个线程都将标志设置为1,从而两个线程都能够进入临界区。这种行为被专业人士称为“bad”——我们显然没有提供最基本的要求:提供互斥。
  • 性能问题(我们将在后面详细讨论)是这样一个事实:线程等待获取已经持有的锁的方式:它无休止地检查flag的值,这种技术称为旋转等待。旋转等待浪费了等待另一个线程释放锁的时间。在单处理器上,这种浪费是非常高的,服务端正在等待的线程甚至不能运行(至少在发生上下文切换之前)!因此,当我们向前推进并开发更复杂的解决方案时,我们也应该考虑避免这种浪费的方法。

ASIDE: DEKKER’S AND PETERSON’S ALGORITHMS

  • 在20世纪60年代,Dijkstra向他的朋友们提出了并发问题,其中一位名叫Theodorus Jozef Dekker的数学家提出了一个解决方案。我们这里讨论的解决方案使用特殊的硬件指令甚至操作系统支持,而Dekker的算法只使用 load 和 store(假设它们彼此之间是原子的,这在早期的硬件上是正确的)。
  • Dekker的方法后来被Peterson改进了。同样,只使用load和store,这样做是为了确保两个线程不会同时进入临界区。下面是Peterson的算法(适用于两个线程);看看你能不能理解代码。flag 和 turn 的用途是什么? 如下代码其实同时使用了 flag 和 turn 来调度线程,对应线程的 flag 置为了 1 说明该线程即将持有锁,和前面的代码类似,但是还使用了一个 turn 来表明下一个需要调度的线程的 ID,此时两个线程都运行到了 while 处时,条件 1 必定为真,条件二必定其中一个线程在执行时为真,该线程相应地进行自旋
  • flag 表示哪个线程想要占用临界区状态,就是举手表示想要访问临界区
  • turn 用于标识当前允许谁进入,就是谁的轮次
int flag[2];
int turn;
void init() {
    // indicate you intend to hold the lock w/ ’flag’
    flag[0] = flag[1] = 0;
    // whose turn is it? (thread 0 or 1)
    turn = 0;
}
void lock() {
    // ’self’ is the thread ID of caller
    // 首先举手表示当前线程要访问临界区,eg 线程 1,即 flag[1]=1
    flag[self] = 1;
    // make it other thread’s turn
    // 这里其实有一个“礼让”的逻辑
    // 就是说虽然是当前线程在执行,但是会把轮次让给另外的线程
    // 如果另外的那个线程确实也举手了,
    //        然后另外的线程还没来得及礼让,那么就由另外的线程执行
    //        要是另外的线程也刚好礼让了,那么就由当前线程执行
    turn = 1 - self;
    // 首先看另外的线程举手了没?
    // case0. 没举手那就只有当前线程访问,直接运行就完事了
    // case1. 另外的线程举手了,那这时候就要看该谁的轮次了
    //   case1.0 如果这时候的轮次确实该另外的线程 1-self,那当前线程就自旋等待
    //   case2.0 如果这时候的轮次该自个儿 self,那当前线程就执行
    while ((flag[1-self] == 1) && (turn == 1 - self))
        ;  // spin-wait while it’s not your turn
}
void unlock() {
    // simply undo your intent
    flag[self] = 0;
}
  • 由于某些原因,开发不需要特殊硬件支持就能工作的锁在一段时间内变得非常流行,这给理论类型带来了许多问题。当然,当人们意识到假定有一点硬件支持会容易得多(事实上,这种支持早在多处理的早期就存在了)时,这一行的工作就变得毫无用处了。此外,上面的算法不能在现代硬件上运行(由于内存一致性模型不严格),因此它们比以前更没用了。然而,更多的研究被扔进了历史的垃圾箱
  • Peterson 算法结合了 LockOne 和 LockTwo 两种算法
    • LockOne 只满足互斥,无法避免死锁。
    • LockTwo 也只满足互斥,无法避免死锁
  • https://zh.wikipedia.org/wiki/Peterson%E7%AE%97%E6%B3%95#cite_note-3
  • 并发编程的艺术02-过滤锁算法
class LockOne implements Lock {
    private boolean[] flag = new boolean[2];
    
    // 因为 Lock 本身不互斥,所以两个线程可能同时都将对方的 flag 标识设置为 true
    // 导致两个线程都被阻塞在 while(true) {} 。
    public void lock() {
        int i = ThreadId.get();
        int j = 1 - i;
        flag[i] = true;
        while(flag[j]) {}
    }
    
    public void unlock() {
        int i = ThreadID.get();
        flag[i] = false;
    }
}

class LockTwo implements Lock {
    private volatile int victim;
    
    // 当一个线程先执行,而另一个线程迟迟不执行
    // 这时候第一个线程就被锁住无法进行向下执行
    public void lock() {
        int i = ThreadId.get();

        // 阻塞率先执行 victim = i 的线程(礼让执行)
        victim = i;
        while(victim == i) {}
    }
    
    public void unlock() {
    }
}

class PetersonLock implements Lock {
    private boolean[] flag = new boolean[2];
    private volatile int victim;
    
    public void lock() {
        int i = ThreadId.get();
        int j = 1 - i;
        flag[i] = true;
        victim = i;
       while(flag[j] && victim == i) {}
    }
    
    public void unlock() {
        int i = ThreadID.get();
        flag[i] = false;
    }
}

class FilterLock implements Lock {
    int[] level;
    int[] victim;
    int n;
    
    public FilterLock(int n) {
        level = new int[n];
        victim = new int[n];
        this.n = n;
        for (int i = 0;i < n; i++) {
            level[i] = 0;
       }
    }
    
    public void lock() {
        int me = ThreadID.get();
        for (int i = 0; i < n; i++) {
            level[me] = i;
            victim[i] = me;
            for (int k = 0; k < n;k++) {
                while ((k != me) && (level[k] >= i && victim[i] == me))) {
                    
                }
            }
        }
    }
    
    public void unlock() {
        int me = ThreadID.get();
        level[me] = 0;
    }
}

Building Working Spin Locks with Test-And-Set

  • 当使用TAS实现TASLock (Test And Set Lock)测试-设置锁,它的特点是自旋时,每次尝试获取锁时,底层还是使用CAS操作,不断的设置锁标志位的过程会一直修改共享变量的值(回写),会引发缓冲一致性流量发风暴。【因为每一次CAS操作都会发出广播通知其他处理器,从而影响程序的性能。】
  • Wikipedia: Test and Set
  • 因为禁用中断不能在多个处理器上工作,而且使用load和store(如上所示)的简单方法也不能工作,所以系统设计人员开始发明对锁定的硬件支持。最早的多处理器系统,如20世纪60年代早期的Burroughs B5000,就有这样的支持;如今,所有系统都提供这种类型的支持,即使是单CPU系统。
  • 需要理解的最简单的硬件支持是 test-and-set (或 atomic exchange) 指令。我们通过下面的C代码片段来定义 test-and-set 指令的作用:
1 int TestAndSet(int *old_ptr, int new) {
2   int old = *old_ptr; // fetch old value at old_ptr
3   *old_ptr = new; // store ’new’ into old_ptr
4   return old; // return the old value
5 }

1 typedef struct __lock_t {
2   int flag;
3 } lock_t;
4
5 void init(lock_t *lock) {
6   // 0: lock is available, 1: lock is held
7   lock->flag = 0;
8 }
9
10 void lock(lock_t *lock) {
11  while (TestAndSet(&lock->flag, 1) == 1)
12      ; // spin-wait (do nothing)
13 }
14
15 void unlock(lock_t *lock) {
16  lock->flag = 0;
17 }
  • test-and-set 指令的作用如下。它返回旧 ptr 所指向的旧值,并同时将该值更新为new。当然,关键是这个操作序列是以原子的方式执行的。 之所以称之为 “test-and-set”,是因为它使您能够同时“test”旧值(即返回的值)“set”内存位置为一个新值;事实证明,这个稍微强大一些的指令足以构建一个简单的旋转锁。
  • 我们先弄清楚为什么这把锁可以用。首先想象一下这样一种情况:
    • 一个线程调用了lock(),而当前没有其他线程持有该锁;因此,flag应该是0。当线程调用 TestAndSet(flag,1),线程返回旧的 flag 值,即 0; 因此,正在 testing flag 值的调用线程将不会在 while 循环中被捕获,并将获得锁。线程也会自动地将该值设置为 1,从而表明现在已持有锁。当线程完成它的临界区时,它调用unlock()将标志设为0。
    • 我们可以想象的第二种情况是,当一个线程已经持有锁(即,flag是1)。在这种情况下,这个线程将调用lock(),然后调用 TestAndSet(flag, 1)。这一次,teststandset()将返回旧的flag值,即1(因为锁被持有),同时再次将其设置为1。只要锁被另一个线程持有,TestAndSet()就会反复返回1,因此这个线程就会不停地旋转,直到锁最终被释放。当标志最终被其他线程设置为0时,该线程将再次调用 TestAndSet(),它现在将返回0,同时原子地将值设置为1,从而获得锁并进入临界区。
  • 通过将 Test(对旧锁的值)和 Set(对新值的值)都设置为单个原子操作,我们确保只有一个线程获得锁。这就是如何构建一个有效的互斥原语!
  • 现在可能也理解了为什么这种类型的锁通常称为自旋锁。它是需要构建的最简单的锁类型,只是使用CPU周期旋转,直到锁可用为止。为了在单个处理器上正常工作,它需要一个抢占式调度程序(即,为了不时地运行另一个线程,它会通过计时器中断一个线程)。如果没有抢占,旋转锁在单个CPU上没有多大意义,因为在CPU上旋转的线程永远不会放弃它。
  • 汇编代码实现:
enter_region:        ; A "jump to" tag; function entry point.

  tsl reg, flag      ; Test and Set Lock; flag is the
                     ; shared variable; it is copied
                     ; into the register reg and flag
                     ; then atomically set to 1.

  cmp reg, #0        ; Was flag zero on entry_region?

  jnz enter_region   ; Jump to enter_region if
                     ; reg is non-zero; i.e.,
                     ; flag was non-zero on entry.

  ret                ; Exit; i.e., flag was zero on
                     ; entry. If we get here, tsl
                     ; will have set it non-zero; thus,
                     ; we have claimed the resource
                     ; associated with flag.

leave_region:
  move flag, #0      ; store 0 in flag
  ret                ; return to caller

Evaluating Spin Locks

  • 有了我们的基本自旋锁,我们现在可以评估它在我们前面描述的轴上的有效性。锁最重要的方面是正确性:它是否提供了互斥?答案是肯定的:旋转锁一次只允许一个线程进入临界区。这样,我们就有了一个正确的锁。
  • 下一个轴是公平。旋转锁对等待的线程有多公平?你能保证等待的线程会进入临界区吗?不幸的是,这里的答案是坏消息:旋转锁不提供任何公平性保证。的确,在争用的情况下,在旋转等待的线程可能一直旋转。简单的旋转锁(到目前为止已经讨论过)是不公平的,可能会导致线程饿死
  • 最后一个轴是性能。使用旋转锁的成本是多少?为了更仔细地分析这个问题,我们建议考虑几个不同的案例。在第一种情况下,想象线程在单个处理器上竞争锁;在第二种情况下,考虑分布在多个cpu上的线程。
    • 对于自旋锁,在单一CPU的情况下,性能开销是非常痛苦的;想象一下持有锁的线程在一个临界区中被抢占的情况。调度程序可能会运行其他每一个线程(假设有N−1个其他线程),每个线程都试图获取锁。在这种情况下,每个线程都将在一个时间片的持续时间内旋转,然后放弃CPU,这是对CPU周期的浪费。
    • 但是,在多个cpu上,旋转锁工作得相当好(如果线程的数量大致等于cpu的数量)。假设CPU 1上的线程A和CPU 2上的线程B都争用一个锁。如果线程(CPU 1)持有锁,线程B试图加锁,B将自旋(CPU 2)。然而,假定关键的部分较短,因此很快锁就变得可用,线程B将获得锁。旋转等待锁在另一个处理器不会浪费很多的周期在本例中,从而可以很有效。
  • 更多避免旋转的理由:优先级反转
  • 避免旋转锁的一个很好的理由是性能:正如正文中所描述的,如果一个线程在持有一个锁时被中断,其他使用旋转锁的线程将花费大量的CPU时间来等待锁可用。然而,在某些系统上避免自旋锁还有另一个有趣的原因:正确性。要警惕的问题是所谓的优先级反转。
  • 现在,这个问题。假设T2由于某种原因被阻塞。所以T1运行,抓住旋转锁,进入一个临界区。T2现在变得畅通(可能是因为一个I/O完成了),CPU调度器会立即调度它(从而重新调度T1)。T2现在尝试获取锁,因为它不能(T1持有锁),所以它只是继续旋转。因为锁是自旋锁,T2永远自旋,系统就挂了。
  • 不幸的是,仅仅避免使用旋转锁并不能避免反转的问题(唉)。假设有三个线程,T1, T2和T3。T3的优先级最高,T1的优先级最低。现在想象一下,T1抓住了一个锁。然后T3启动,由于它的优先级高于T1,所以它立即运行(抢占T1)。T3试图获取T1所持有的锁,但是由于T1仍然持有锁,所以被困在等待中。如果T2开始运行,它将拥有比T1更高的优先级,因此它将运行。T3的优先级高于T2,它被困在等待T1,由于T2正在运行,T1可能永远不会运行。强大的T3不能运行,而不起眼的T2控制着CPU ?高优先级已经不像以前那么重要了。
  • 您可以通过多种方式解决优先级反转问题。在自旋锁导致问题的特定情况下,可以避免使用自旋锁(后面将详细描述)。一般来说,高优先级线程等待低优先级线程可以暂时提高低优先级线程的优先级,从而使其能够运行并克服反转,这种技术称为优先级继承。最后一个解决方案是最简单的:确保所有线程具有相同的优先级。

Compare-And-Swap

  • 在计算机科学中,比较与交换(CAS)是多线程中用来实现同步的原子指令。它将内存位置的内容与给定值进行比较,只有当它们相同时,才将该内存位置的内容修改为新的给定值。这是作为单个原子操作完成的
  • Wikipedia: CAS
  • 一些系统提供的另一个硬件原语是 Compare-And-Swap 指令(例如,SPARC上这样称呼它)或compare-and-exchange 指令(x86上这样称呼它)。也就是我们常说的 CAS 操作。这条指令的C伪代码如下:
1 int CompareAndSwap(int *ptr, int expected, int new) {
2   int original = *ptr;
3   if (original == expected)
4       *ptr = new;
5   return original;
6 }
  • 其基本思想是通过 compare-and-swap 来测试ptr指定的地址上的值是否等于预期值;如果是,用新值更新ptr所指向的内存位置。如果不是,那就什么都不做。在这两种情况下,返回该内存位置的原始值,从而允许调用 compare-and-swap 的代码知道它是否成功。
  • CAS 操作其实就比 TAS 多了一个参数,expected 预期值,预期值 E 主要用于和当前值的比对,从而来决定是否更新新的值。
    20210404172916
  • 使用 compare-and-swap 指令,我们可以以类似于使用 test-and-set 的方式构建锁。例如,我们可以用以下代码替换上面的lock()例程:
1 void lock(lock_t *lock) {
2   while (CompareAndSwap(&lock->flag, 0, 1) == 1)
3       ; // spin
4 }
  • 其余的代码与上面的测试集示例相同。这段代码的工作原理非常相似;它只是检查标志是否为0,如果是,就自动交换1,从而获得锁。当锁被持有时,试图获取锁的线程将被困在旋转中,直到锁最终被释放。
  • 如果您想了解如何真正制作一个 c 可调用的x86版本的 compare-and-swap,看以下链接:
    https://github.com/remzi-arpacidusseau/ostep-code/tree/master/threads-locks
#include <stdio.h>

int global = 0;

char compare_and_swap(int *ptr, int old, int new) {
    unsigned char ret;
    // Note that sete sets a ’byte’ not the word
    __asm__ __volatile__ (
	" lock\n"
	" cmpxchgl %2,%1\n"
	" sete %0\n"
	: "=q" (ret), "=m" (*ptr)
	: "r" (new), "m" (*ptr), "a" (old)
	: "memory");
    return ret;
}

int main(int argc, char *argv[]) {
    printf("before successful cas: %d\n", global);
    int success = compare_and_swap(&global, 0, 100);
    printf("after successful cas: %d (success: %d)\n", global, success);
    
    printf("before failing cas: %d\n", global);
    success = compare_and_swap(&global, 0, 200);
    printf("after failing cas: %d (old: %d)\n", global, success);

    return 0;
}
  • 最后,您可能已经感觉到,compare_and_swap 指令比 test-and-set 指令更强大。我们将在以后简要地研究诸如无锁同步等主题时使用这种能力。然而,如果我们只是用它构建一个简单的自旋锁,它的行为与我们上面分析的自旋锁相同。

Load-Linked and Store-Conditional

  • 一些平台提供了一对协同工作的指令来帮助构建关键部分。例如,在MIPS架构中,load-linked 和 store-conditional 指令可以串联使用来构建锁和其他并发结构。这些指令的 C 伪代码如下。Alpha、PowerPC和ARM提供了类似的说明
1 int LoadLinked(int *ptr) {
2   return *ptr;
3 }
4
5 int StoreConditional(int *ptr, int value) {
6   if (no update to *ptr since LoadLinked to this address) {
7       *ptr = value;
8       return 1; // success!
9   } else {
10      return 0; // failed to update
11  }
12 }
  • load-linked的操作与典型的load指令非常相似,它只是从内存中获取一个值并将其放入寄存器中。关键的不同在于store-conditional,它只在没有对该地址进行存储的情况下才成功(并更新存储在刚刚进行load-linked的地址上的值)。在成功的情况下,store-conditional返回1并将ptr处的值更新为value;如果失败,则ptr处的值不会更新,返回0。
  • lock()代码是唯一有趣的部分。首先,线程旋转,等待标志被设置为0(从而表明锁没有被持有)。一旦这样,线程试图通过store条件获取锁;如果成功,线程就自动地将标志的值更改为1,因此可以进入临界区
  • 请注意 store-conditional 是如何发生故障的。一个线程调用lock()并执行LoadLinked,当锁未被持有时返回0。在它尝试 store-conditional 之前,它被中断,另一个线程进入 lock代码,也执行 load-linked 指令,得到一个0,然后继续。此时,两个线程分别执行了load-linked,并且都准备尝试store-conditional。这些指令的关键特性是,只有其中一个线程会成功地将标志更新为1,从而获得锁;第二个尝试 store-conditional 的线程将失败(因为另一个线程更新了它的load-linked和store-conditional之间的标志值),因此必须再次尝试获取锁。
1 void lock(lock_t *lock) {
2   while (1) {
3       while (LoadLinked(&lock->flag) == 1)
4           ; // spin until it’s zero
5       if (StoreConditional(&lock->flag, 1) == 1)
6           return; // if set-it-to-1 was a success: all done
7                   // otherwise: try it all over again
8   }
9 }
10
11 void unlock(lock_t *lock) {
12  lock->flag = 0;
13 }
  • 简化一下如上代码
1 void lock(lock_t *lock) {
2   while (LoadLinked(&lock->flag) ||
3           !StoreConditional(&lock->flag, 1))
4       ; // spin
5 }

Fetch-And-Add

  • 最后一个硬件原语是“Fetch-And-Add”指令,它在返回特定地址的旧值时自动增加一个值。Fetch-And-Add指令的C伪代码是这样的:
1 int FetchAndAdd(int *ptr) {
2   int old = *ptr;
3   *ptr = old + 1;
4   return old;
5 }
  • 在本例中,我们将使用取加(fetch-and-add)来构建一个更有趣的 ticket lock,。锁定和解锁代码如下:
1 typedef struct __lock_t {
2   int ticket;
3   int turn;
4 } lock_t;
5
6 void lock_init(lock_t *lock) {
7   lock->ticket = 0;
8   lock->turn = 0;
9 }
10
11 void lock(lock_t *lock) {
12  int myturn = FetchAndAdd(&lock->ticket);
13  while (lock->turn != myturn)
14      ; // spin
15 }
16
17 void unlock(lock_t *lock) {
18  lock->turn = lock->turn + 1;
19 }

  • 这个解决方案不是使用单个值,而是组合使用 ticket 和 turn 变量来构建一个锁。基本操作非常简单:当线程希望获取锁时,它首先对票据值执行原子的取加操作;这个值现在被认为是这个线程的“回合”(myturn)。然后使用全局共享的 lock->turn 来确定线程的回合;当(myturn == turn)对于一个给定的线程,它是线程进入临界区的回合。解锁可以通过增加回合数来实现,这样下一个等待线程(如果有的话)就可以进入临界区。
  • 注意这个解决方案与我们之前尝试的一个重要区别:它确保了所有线程的执行。一旦给一个线程分配了它的票证值,它将在未来的某个时间点被调度(一旦它前面的线程通过了临界区并释放了锁)。在我们以前的尝试中,并不存在这样的保证;在 test-and-set 上自旋的线程(例如)可能会永远自旋,即使其他线程获得和释放锁。

Too Much Spinning: What Now?

  • 我们的简单的基于硬件的锁很简单(只有几行代码),而且它们可以工作(如果您愿意,甚至可以通过编写一些代码来证明这一点),这是任何系统或代码的两个优秀特性。然而,在某些情况下,这些解决方案可能非常低效。假设您在一个处理器上运行两个线程。现在,假设有一个线程(线程0)处于临界区,因此持有一个锁,不幸的是被中断。第二个线程(线程1)现在尝试获取锁,但发现锁被持有。因此,它开始自旋,接着自旋。
  • 然后它继续自旋。最后,时钟中断产生,线程0再次运行,释放锁,最后(比如,下一次运行时),线程1不必自选了,可以获得锁。因此,任何时候一个线程在这种情况下被捕获,它都浪费了整个时间片,自旋只检查一个不会改变的值! 当有N个线程争用一个锁时,这个问题会变得更糟;N−1个时间片也可能以类似的方式浪费,只是自旋并等待一个线程释放锁。因此,我们的下一个问题:我们怎样才能开发出一个不需要浪费时间在CPU上自旋的锁呢
  • 仅靠硬件支持无法解决这个问题。我们也需要操作系统的支持!现在让我们来看看它是如何工作的。

A Simple Approach: Just Yield, Baby

  • 硬件支持让我们走得很远:工作锁,甚至(就像票据锁的情况)获取锁的公平性。然而,我们仍然有一个问题:当临界区发生上下文切换,并且线程开始无休止地旋转,等待被中断时,该怎么办让(持有锁的)线程再次运行?
  • 我们的第一个尝试是一个简单而友好的方法:当要自旋时,将CPU让给另一个线程。如下演示了这种方法:
1 void init() {
2   flag = 0;
3 }
4
5 void lock() {
6   while (TestAndSet(&flag, 1) == 1)
7       yield(); // give up the CPU
8 }
9
10 void unlock() {
11  flag = 0;
12 }

  • 在这种方法中,我们假设一个操作系统原语 yield(),当线程想要放弃CPU并让另一个线程运行时,可以调用它。线程可以处于以下三种状态中的一种(运行、准备或阻塞); yield只是一个系统调用,它将调用者从运行状态移动到就绪状态,从而使另一个线程运行。因此,yield 的线程本质上是对自己进行了重新调度
  • 考虑一个CPU上有两个线程的例子;在这种情况下,我们基于 yield 的方法非常有效。如果一个线程碰巧调用了lock(),并发现一个锁被持有,它就会释放CPU,这样另一个线程就会运行并完成它的临界区。在这个简单的例子中,yielding 方法很有效。
  • 现在让我们考虑一下有许多线程(比如100个)反复争夺一个锁的情况。在这种情况下,如果一个线程获得了锁,并在释放它之前被抢占,其他99个线程将调用lock(),找到所持有的锁,并交出CPU。假设有某种轮询调度器,在持有锁的线程再次运行之前,这99个调度器中的每一个都将执行这个run-and-yield模式。虽然比我们的旋转方法更好(这将浪费99个时间切片旋转),但这种方法仍然是昂贵的;上下文切换的成本可能很高,因此会产生大量的浪费
  • 更糟糕的是,我们根本没有解决饥饿问题。当其他线程反复进入和退出临界区时,一个线程可能会陷入一个无穷 yield 循环。我们显然需要一种直接解决这一问题的方法。

Using Queues: Sleeping Instead Of Spinning

  • 我们之前方法的真正问题是,它们留给了太多的机会。调度程序决定下一个线程运行;如果调度程序做出了错误的选择,那么运行的线程要么必须自旋,等待锁(我们的第一种方法),要么立即 yield CPU (我们的第二种方法)。无论哪种方式,都有浪费的可能,而且无法防止饥饿。
  • 因此,在当前持有者释放锁之后,必须显式地对下一个获得锁的线程施加一些控制。为此,我们需要更多的操作系统支持,以及一个队列来跟踪哪些线程正在等待获取锁
  • 为简单起见,我们将使用 Solaris 提供的支持,包括两种调用: park() 将调用线程置于睡眠状态,unpark(threadID) 唤醒由threadID指定的特定线程。 这两个例程可以一起使用来构建一个锁,如果调用者试图获取一个被占用的锁,那么这个锁将使调用者处于休眠状态,当锁空闲时将其唤醒。如下代码所示
1 typedef struct __lock_t {
2   int flag;
3   int guard;
4   queue_t *q;
5 } lock_t;
6
7 void lock_init(lock_t *m) {
8   m->flag = 0;
9   m->guard = 0;
    // 初始化锁的等待队列
10  queue_init(m->q);
11 }
12
13 void lock(lock_t *m) {
    // 尝试获取锁,将 guard 设置为 1,如果已经为 1 自旋等待
14  while (TestAndSet(&m->guard, 1) == 1)
15      ; //acquire guard lock by spinning
16  if (m->flag == 0) {
        // 获取到了锁,将 guard 置为 0
17      m->flag = 1; // lock is acquired
18      m->guard = 0;
19  } else {
        // 未获取到锁,加入该锁的等待队列中
20      queue_add(m->q, gettid());
        // 将 guard 置为 0
21      m->guard = 0;
        // 将该线程休眠,等待唤醒
22      park();
23  }
24 }
25
26 void unlock(lock_t *m) {
    // 将 guard 设置为 1,如果已经为 1 自选等待
27  while (TestAndSet(&m->guard, 1) == 1)
28      ; //acquire guard lock by spinning
29  if (queue_empty(m->q))
        // 如果该锁的等待队列为空,那就标记锁为空闲态
30      m->flag = 0; // let go of lock; no one wants it
31  else
        // 如果该锁的等待队列不为空,相应的出队一个线程,唤醒该线程去获取锁
32      unpark(queue_remove(m->q)); // hold lock
33                                  // (for next thread!)
    // 将 guard 设置为 0
34  m->guard = 0;
35 }

  • 在这个例子中,我们做了一些有趣的事情。首先,我们将旧的 test-and-set 思想与一个显式的锁等待队列相结合,以获得更高效的锁。其次,我们使用队列来帮助控制谁下一个获得锁,从而避免饥饿
  • 您可能会注意到 Guard 是如何使用的,它基本上是围绕着锁所使用的标志和队列操作的自旋锁。因此,这种方法不能完全避免旋转等待;一个线程可能在获取或释放锁时被中断,从而导致其他线程旋转——等待这个线程再次运行。然而,旋转所花费的时间非常有限(锁定和解锁代码中只有几条指令,而不是用户定义的关键部分),因此这种方法可能是合理的。
  • 您可能还会注意到,在lock()中,当线程无法获得锁(它已经被持有)时,我们会小心地将自己添加到队列中(通过调用gettid()函数来获取当前线程的线程ID),将guard设置为0,并 yield CPU。给读者一个问题:如果在park()之后,而不是之前,释放守卫锁会发生什么?提示: something bad。(park之后设置guard的话,就会出现guard一直为1的情况,然后一直陷入自旋等待
  • 您还可以进一步检测到,当另一个线程被唤醒时,该标志没有被设回0。这是为什么呢?好吧,这不是错误,而是必须的!当一个线程被唤醒时,它将像从park()返回一样;然而,它在代码的那一点上并没有保持保护,因此甚至不能尝试将标志设置为1。因此,我们只需将释放锁的线程直接传递给获取锁的下一个线程;flag在两者之间没有设置为0
  • 最后,您可能会注意到解决方案中在调用park()之前出现了可感知的竞争条件。在错误的时间(比如 park 调用之前),一个线程将会被暂停,假设它应该休眠,直到锁不再被持有。此时切换到另一个线程(例如,持有锁的线程)可能会导致问题,例如,如果该线程随后释放了锁。第一个线程之后的park 将永远休眠(潜在的),这个问题有时被称为wakeup/waiting race。为了解决这个问题,需要做一些额外的工作。(其实就是在无限等待 park
  • Solaris通过添加第三个系统调用 setpark() 解决了这个问题。通过调用这个例程,线程可以指示它将要 park。如果它恰好被中断了,刚好另一个线程被调度,并且另一个线程在实际调用 park 之前调用unpark,那么随后的 park 将立即返回,而不是休眠。lock() 内部的代码修改非常小
1 queue_add(m->q, gettid());
2 setpark(); // new code
3 m->guard = 0;
  • 另一种解决方案可以将 guard 传递到内核中。在这种情况下,内核可以采取预防措施,原子地释放锁并使正在运行的线程退出队列。

Different OS, Different Support

  • 到目前为止,我们已经看到了操作系统为了在线程库中构建更有效的锁而提供的一种支持。其他操作系统也提供了类似的支持;细节有所不同。
  • 例如,Linux提供了一个futex,它类似于Solaris接口,但提供了更多的内核功能。具体来说,每个futex都有一个特定的物理内存位置,以及一个每个futex的内核队列。调用者可以根据需要使用futex调用(如下所述)来睡眠和唤醒。
  • 具体来说,有两个调用可用。对 futex_wait(address, expected) 的调用将调用线程置于睡眠状态,假设 address 上的值等于 expected。如果不相等,则调用立即返回。对例程futex_wake(address) 的调用唤醒一个正在队列中等待的线程。这些调用在 Linux 互斥中的用法如下所示:
1 void mutex_lock (int *mutex) {
2   int v;
3   /* Bit 31 was clear, we got the mutex (the fastpath) */
    // 自旋锁
4   if (atomic_bit_test_set (mutex, 31) == 0)
5       return;
    // 请求锁,相应的 mutex+1
6   atomic_increment (mutex);
7   while (1) {
        // 被unlock唤醒了!!获取锁然后维护等待队列长度
8       if (atomic_bit_test_set (mutex, 31) == 0) {
9           atomic_decrement (mutex);
10          return;
11      }
12      /* We have to waitFirst make sure the futex value
13         we are monitoring is truly negative (locked). */
14      v = *mutex;
        // 判断整数的正负,正则为被持有
15      if (v >= 0)
16          continue;
        // 为负,即该锁将被持有,会有竞争发生,休眠线程
        // 原子性的检查 mutex 中计数器的值是否为val,如果是则让进程休眠,
        // 直到FUTEX_WAKE或者超时(time-out)。也就是把进程挂到 mutex 相对应的等待队列上去。
17      futex_wait (mutex, v);
18  }
19 }
20
21 void mutex_unlock (int *mutex) {
22  /* Adding 0x80000000 to counter results in 0 if and
23  only if there are not other interested threads */
    // 解锁,如果等待队列长度是0就不用唤醒!
    // 不把这个逻辑放futex_wake是为了减少sys call的开销。
24  if (atomic_add_zero (mutex, 0x80000000))
25      return;
26
27  /* There are other threads waiting for this mutex,
28     wake one of them up. */
    // 唤醒等待 mutex 的进程
29  futex_wake (mutex);
30 }
  • 这段来自nptl库(gnu libc库的一部分)中的 lowlevellock.h 的代码片段很有趣,原因有几个。首先,它使用单个整数来跟踪锁是否被持有(整数的高位)和锁上的等待者数量(所有其他位)。因此,如果锁是负的,它将被持有(因为设置了高位,该位决定了整数的符号)。
  • 其次,代码片段展示了如何针对常见情况进行优化,特别是当没有争用锁的时候;只有一个线程获取和释放锁,完成的工作很少(获取锁时 TestAndSet 原子位运算,释放锁原子位加法)。
  • 看看您能否解开这个“现实世界”锁的其余部分,以理解它是如何工作的。做到这一点,成为Linux锁的大师,或者至少是当一本书告诉你要做什么的时候倾听的人。

Two-Phase Locks

  • 最后一点:Linux方法有一种老方法,这种方法已经断断续续地使用了多年,至少可以追溯到 Dahm 锁在1960年代早期,现在被称为两阶段锁。两阶段锁意识到自旋是有用的,特别是当锁即将被释放的时候。所以在第一阶段,锁会自旋一段时间,希望它能获得锁
  • 但是,如果在第一个旋转阶段没有获得锁,那么就会进入第二个阶段,在这个阶段中调用者被置于睡眠状态,直到后来锁被释放时才会被唤醒。上面的Linux锁就是这种锁的一种形式,但它只旋转一次;更常见的方式是在循环中自旋固定的次数,然后使用 futex 睡眠。
  • 两阶段锁是混合方法的另一个实例,在这种方法中,结合两个好的想法可能会产生一个更好的想法。当然,是否这样做很大程度上取决于许多因素,包括硬件环境、线程数量和其他工作负载细节。像往常一样,制作一个适用于所有可能用例的通用锁是一个很大的挑战。

Summary

  • 上面的方法展示了目前如何构建真正的锁:一些硬件支持(以更强大的指令的形式)加上一些操作系统支持(例如,在Solaris上以park()和unpark()原语的形式,或在Linux上以futex的形式)。当然,细节是不同的,执行这种锁定的确切代码通常是经过高度调整的。如果您想了解更多细节,请查看Solaris或Linux代码库;它们是一本引人入胜的读物。也可以参阅David等人关于现代多处理器上的锁策略比较的优秀工作。

Lock-based Concurrent Data Structures

  • 对于特定数据结构,如何加锁才能让该结构功能正确?进一步,如何对该数据结构加锁,能够保证高性能,让许多线程同时访问该结构,即并发访问(concurrently)?
  • 这里会简单介绍一些并发的数据结构,后期补充一个 并发跳表

Concurrent Counters

  • 如下先定义一个非并发的计数器,操作很简单,主要三种操作。+ / - / r
1 typedef struct counter_t {
2   int value;
3 } counter_t;
4
5 void init(counter_t *c) {
6   c->value = 0;
7 }
8
9 void increment(counter_t *c) {
10  c->value++;
11 }
12
13 void decrement(counter_t *c) {
14  c->value--;
15 }
16
17 int get(counter_t *c) {
18  return c->value;
19 } 
  • 为了实现并发的效果,简单粗暴地加锁就完事了。其实很好理解,就是在相应的非线程安全函数执行之前对应的加锁和释放锁就 OK。显然简单粗暴多半就意味着性能是有一定问题的。
1 typedef struct counter_t {
2   int value;
3   pthread_mutex_t lock;
4 } counter_t;
5
6 void init(counter_t *c) {
7   c->value = 0;
8   Pthread_mutex_init(&c->lock, NULL);
9 }
10
11 void increment(counter_t *c) {
12  Pthread_mutex_lock(&c->lock)
13  c->value++;
14  Pthread_mutex_unlock(&c->lock);
15 }
16
17 void decrement(counter_t *c) {
18  Pthread_mutex_lock(&c->lock);
19  c->value--;
20  Pthread_mutex_unlock(&c->lock);
21 }
22
23 int get(counter_t *c) {
24  Pthread_mutex_lock(&c->lock);
25  int rc = c->value;
26  Pthread_mutex_unlock(&c->lock);
27  return rc;
28 } 

  • 为了理解简单方法的性能代价,我们运行了一个基准测试,其中每个线程更新一个共享计数器固定次数;然后我们改变线程的数量。下图显示了在一到四个线程处于活动状态时所花费的总时间;每个线程更新计数器一百万次。这个实验是在装有四个Intel 2.7 GHz i5 cpu的iMac上进行的;随着更多的cpu处于活动状态,我们希望在单位时间内完成更多的工作
  • 从图上方的曲线(标为“Precise”)可以看出,同步的计数器扩展性不好。单线程完成 100 万次更新只需要很短的时间(大约 0.03s),而两个线程并发执行,每个更新 100 万次,性能下降很多(超过 5s!)。线程更多时,性能更差。
  • 理想情况下,你会看到多处理上运行的多线程就像单线程一样快。达到这种状态称为完美扩展(perfect scaling)。虽然总工作量增多,但是并行执行后,完成任务的时间并没有增加。
    20210404221747

Scalable Counting

  • 令人吃惊的是,关于如何实现可扩展的计数器,研究人员已经研究了多年。更令人吃惊的是,最近的操作系统性能分析研究表明,可扩展的计数器很重要。没有可扩展的计数,一些运行在 Linux 上的工作在多核机器上将遇到严重的扩展性问题。
  • 尽管人们已经开发了多种技术来解决这一问题,我们将介绍一种特定的方法。这个方法是最近的研究提出的,称为懒惰计数器(sloppy counter)
  • 懒惰计数器通过多个局部计数器和一个全局计数器来实现一个逻辑计数器,其中每个CPU 核心有一个局部计数器。具体来说,在 4 个 CPU 的机器上,有 4 个局部计数器和 1 个全局计数器。除了这些计数器,还有锁:每个局部计数器有一个锁,全局计数器有一个。
  • 懒惰计数器的基本思想是这样的。如果一个核心上的线程想增加计数器,那就增加它的局部计数器,访问这个局部计数器是通过对应的局部锁同步的。因为每个 CPU 有自己的局部计数器,不同 CPU 上的线程不会竞争,所以计数器的更新操作可扩展性好。但是,为了保持全局计数器更新(以防某个线程要读取该值),局部值会定期转移给全局计数器,方法是获取全局锁,让全局计数器加上局部计数器的值,然后将局部计数器置零
  • 这种局部转全局的频度,取决于一个阈值,这里称为 S(表示 sloppiness)。S 越小,懒惰计数器则越趋近于非扩展的计数器。S 越大,扩展性越强,但是全局计数器与实际计数的偏差越大。我们可以抢占所有的局部锁和全局锁(以特定的顺序,避免死锁),以获得精确值,但这种方法没有扩展性。
  • 为了弄清楚这一点,来看一个例子(见表)。在这个例子中,阈值 S 设置为 5,4 个 CPU 上分别有一个线程更新局部计数器 L1,…, L4。随着时间增加,全局计数器 G 的值也会记录下来。每一段时间,局部计数器可能会增加。如果局部计数值增加到阈值 S,就把局部值转移到全局计数器,局部计数器清零。
    20210405134342
  • 上面的曲线图中接近 X 轴的是阈值 S 为 1024 时懒惰计数器的性能。性能很高,4 个处理器更新 400 万次的时间和一个处理器更新 100 万次的几乎一样。
  • 下图展示了了阈值 S 的重要性,在 4 个 CPU 上的 4 个线程,分别增加计数器 100 万次。如果 S 小,性能很差(但是全局计数器精确度高)。如果 S 大,性能很好,但是全局计数器会有延时。懒惰计数器就是在准确性和性能之间折中。
    20210405153232
  • 下面代码就是懒惰计数器的基本实现。
1 typedef struct counter_t {
2   int global; // global count
3   pthread_mutex_t glock; // global lock
4   int local[NUMCPUS]; // local count (per cpu)
5   pthread_mutex_t llock[NUMCPUS]; // ... and locks
6   int threshold; // update frequency
7 } counter_t;
8
9  // init: record threshold, init locks, init values
10 // of all local counts and global count
11 void init(counter_t *c, int threshold) {
12  c->threshold = threshold;
13
14  c->global = 0;
15  pthread_mutex_init(&c->glock, NULL);
16
17  int i;
18  for (i = 0; i < NUMCPUS; i++) {
19      c->local[i] = 0; 
20      pthread_mutex_init(&c->llock[i], NULL);
21  }
22 }
23
24 // update: usually, just grab local lock and update local amount
25 // once local count has risen by 'threshold', grab global
26 // lock and transfer local values to it
27 void update(counter_t *c, int threadID, int amt) {
28  pthread_mutex_lock(&c->llock[threadID]);
29  c->local[threadID] += amt; // assumes amt > 0
30  if (c->local[threadID] >= c->threshold) { // transfer to global
31      pthread_mutex_lock(&c->glock);
32      c->global += c->local[threadID];
33      pthread_mutex_unlock(&c->glock);
34      c->local[threadID] = 0;
35  }
36  pthread_mutex_unlock(&c->llock[threadID]);
37 }
38
39 // get: just return global amount (which may not be perfect)
40 int get(counter_t *c) {
41  pthread_mutex_lock(&c->glock);
42  int val = c->global;
43  pthread_mutex_unlock(&c->glock);
44  return val; // only approximate!
45 } 

Concurrent Linked Lists

  • 接下来看一个更复杂的数据结构,链表。同样,我们从一个基础实现开始。简单起见,
    我们只关注链表的插入操作,其他操作比如查找、删除等就交给读者了。
1 // basic node structure
2 typedef struct node_t {
3   int key;
4   struct node_t *next;
5 } node_t;
6
7 // basic list structure (one used per list)
8 typedef struct list_t {
9   node_t *head;
10  pthread_mutex_t lock;
11 } list_t;
12
13 void List_Init(list_t *L) {
14  L->head = NULL; 
15  pthread_mutex_init(&L->lock, NULL);
16 }
17  
    // 从链表头插入
18 int List_Insert(list_t *L, int key) {
19  pthread_mutex_lock(&L->lock);
20  node_t *new = malloc(sizeof(node_t));
21  if (new == NULL) {
22      perror("malloc");
23      pthread_mutex_unlock(&L->lock);
24      return -1; // fail
25  }
26  new->key = key;
27  new->next = L->head;
28  L->head = new;
29  pthread_mutex_unlock(&L->lock);
30  return 0; // success
31 }
32
33 int List_Lookup(list_t *L, int key) {
34  pthread_mutex_lock(&L->lock);
35  node_t *curr = L->head;
36  while (curr) {
37      if (curr->key == key) {
38          pthread_mutex_unlock(&L->lock);
39          return 0; // success
40      }
41      curr = curr->next;
42  }
43  pthread_mutex_unlock(&L->lock);
44  return -1; // failure
45 }
  • 从代码中可以看出,代码插入函数入口处获取锁,结束时释放锁。如果 malloc 失败(在极少的时候),会有一点小问题,在这种情况下,代码在插入失败之前,必须释放锁。事实表明,这种异常控制流容易产生错误。最近一个 Linux 内核补丁的研究表明,有40%都是这种很少发生的代码路径(实际上,这个发现启发了我们自己的一些研究,我们从 Linux 文件系统中移除了所有内存失败的路径,得到了更健壮的系统)。
  • 因此,挑战来了:我们能够重写插入和查找函数,保持并发插入正确,但避免在失败情况下也需要调用释放锁吗?
  • 在这个例子中,答案是可以。具体来说,我们调整代码,让获取锁和释放锁只环绕插入代码的真正临界区。 前面的方法有效是因为部分工作实际上不需要锁,假定 malloc()是线程安全的,每个线程都可以调用它,不需要担心竞争条件和其他并发缺陷。只有在更新共享列表时需要持有锁
1 void List_Init(list_t *L) {
2   L->head = NULL;
3   pthread_mutex_init(&L->lock, NULL);
4 }
5
6 void List_Insert(list_t *L, int key) {
7   // synchronization not needed
8   node_t *new = malloc(sizeof(node_t));
9   if (new == NULL) {
10      perror("malloc");
11      return;
12  }
13  new->key = key;
14
15  // just lock critical section
16  pthread_mutex_lock(&L->lock);
17  new->next = L->head;
18  L->head = new;
19  pthread_mutex_unlock(&L->lock);
20 }
21
22 int List_Lookup(list_t *L, int key) {
23  int rv = -1;
24  pthread_mutex_lock(&L->lock);
25  node_t *curr = L->head;
26  while (curr) {
27      if (curr->key == key) {
28          rv = 0;
29          break;
30      }
31      curr = curr->next;
32  }
33  pthread_mutex_unlock(&L->lock);
34  return rv; // now both success and failure
35 } 

Scaling Linked Lists

  • 尽管我们有了基本的并发链表,但又遇到了这个链表扩展性不好的问题。研究人员发现的增加链表并发的技术中,有一种叫作过手锁(hand-over-hand locking,也叫作锁耦合,lock coupling)。
  • 原理也很简单。每个节点都有一个锁,替代之前整个链表一个锁。遍历链表的时候,首先抢占下一个节点的锁,然后释放当前节点的锁。
  • 从概念上说,过手锁链表有点道理,它增加了链表操作的并发程度。但是实际上,在遍历的时候,每个节点获取锁、释放锁的开销巨大,很难比单锁的方法快。即使有大量的线程和很大的链表,这种并发的方案也不一定会比单锁的方案快。也许某种杂合的方案(一定数量的节点用一个锁)值得去研究。
  • 如果方案带来了大量的开销(例如,频繁地获取锁、释放锁),那么高并发就没有什么意义。如果简单的方案很少用到高开销的调用,通常会很有效。增加更多的锁和复杂性可能会适得其反。话虽如此,有一种办法可以获得真知:实现两种方案(简单但少一点并发,复杂但多一点并发),测试它们的表现。毕竟,你不能在性能上作弊。结果要么更快,要么不快。
  • 有一个通用建议,对并发代码和其他代码都有用,即注意控制流的变化导致函数返回和退出,或其他错误情况导致函数停止执行。因为很多函数开始就会获得锁,分配内存,或者进行其他一些改变状态的操作,如果错误发生,代码需要在返回前恢复各种状态,这容易出错。因此,最好组织好代码,减少这种模式。

Concurrent Queues

  • 你现在知道了,总有一个标准的方法来创建一个并发数据结构:添加一把大锁。对于一个队列,我们将跳过这种方法。我们来看看 Michael 和 Scott 设计的、更并发的队列。
  • 仔细研究这段代码,你会发现有两个锁,一个负责队列头,另一个负责队列尾。这两个锁使得入队列操作和出队列操作可以并发执行,因为入队列只访问 tail 锁,而出队列只访问 head 锁。
  • Michael 和 Scott 使用了一个技巧,添加了一个假节点(在队列初始化的代码里分配的)。该假节点分开了头和尾操作。研究这段代码,或者输入、运行、测试它,以便更深入地理解它。
  • 队列在多线程程序里广泛使用。然而,这里的队列(只是加了锁)通常不能完全满足这种程序的需求。更完善的有界队列,在队列空或者满时,能让线程等待。这是下一章探讨条件变量时集中研究的主题。读者需要看仔细了!
1 typedef struct __node_t {
2   int value;
3   struct __node_t *next;
4 } node_t;
5
6 typedef struct __queue_t {
7   node_t *head;
8   node_t *tail;
9   pthread_mutex_t head_lock, tail_lock;
10 } queue_t;
11
12 void Queue_Init(queue_t *q) {
13  node_t *tmp = malloc(sizeof(node_t));
14  tmp->next = NULL;
15  q->head = q->tail = tmp;
16  pthread_mutex_init(&q->head_lock, NULL);
17  pthread_mutex_init(&q->tail_lock, NULL);
18 }
19
20 void Queue_Enqueue(queue_t *q, int value) {
21  node_t *tmp = malloc(sizeof(node_t));
22  assert(tmp != NULL);
23  tmp->value = value;
24  tmp->next = NULL;
25
26  pthread_mutex_lock(&q->tail_lock);
27  q->tail->next = tmp;
28  q->tail = tmp;
29  pthread_mutex_unlock(&q->tail_lock);
30 }
31
32 int Queue_Dequeue(queue_t *q, int *value) {
33  pthread_mutex_lock(&q->head_lock);
34  node_t *tmp = q->head;
35  node_t *new_head = tmp->next;
36  if (new_head == NULL) {
37      pthread_mutex_unlock(&q->head_lock);
38      return -1; // queue was empty
39  }
40  *value = new_head->value;
41  q->head = new_head;
42  pthread_mutex_unlock(&q->head_lock);
43  free(tmp);
44  return 0;
45 }

Concurrent Hash Table

  • 我们讨论最后一个应用广泛的并发数据结构,散列表。我们只关注不需要调整大小的简单散列表。支持调整大小还需要一些工作,留给读者作为练习。
  • 本例的散列表使用我们之前实现的并发链表,性能特别好。每个散列桶(每个桶都是一个链表)都有一个锁,而不是整个散列表只有一个锁,从而支持许多并发操作。
1 #define BUCKETS (101)
2
3 typedef struct __hash_t {
4   list_t lists[BUCKETS];
5 } hash_t;
6
7 void Hash_Init(hash_t *H) {
8   int i;
9   for (i = 0; i < BUCKETS; i++)
10      List_Init(&H->lists[i]);
11 }
12
13 int Hash_Insert(hash_t *H, int key) {
14  return List_Insert(&H->lists[key % BUCKETS], key);
15 }
16
17 int Hash_Lookup(hash_t *H, int key) {
18  return List_Lookup(&H->lists[key % BUCKETS], key);
19 }

  • 下图展示了并发更新下的散列表的性能(同样在 4 CPU 的 iMac,4 个线程,每个线程分别执行 1 万~5 万次并发更新)。同时,作为比较,我们也展示了单锁链表的性能。可以看出,这个简单的并发散列表扩展性极好,而链表则相反。
    20210405160416
  • 建议:避免不成熟的优化(Knuth 定律)
  • 实现并发数据结构时,先从最简单的方案开始,也就是加一把大锁来同步。这样做,你很可能构建了正确的锁。如果发现性能问题,那么就改进方法,只要优化到满足需要即可。正如 Knuth 的著名说法“不成熟的优化是所有坏事的根源。”
  • 许多操作系统,在最初过渡到多处理器时都是用一把大锁,包括 Sun 和 Linux。在 Linux 中,这个锁甚至有个名字,叫作 BKL(大内核锁,big kernel lock)。这个方案在很多年里都很有效,直到多 CPU 系统普及,内核只允许一个线程活动成为性能瓶颈。终于到了为这些系统优化并发性能的时候了。Linux 采用了简单的方案,把一个锁换成多个。Sun 则更为激进,实现了一个最开始就能并发的新系统,Solaris。读者可以通过 Linux 和 Solaris 的内核资料了解更多信息。

Summary

  • 我们已经介绍了一些并发数据结构,从计数器到链表队列,最后到大量使用的散列表。同时,我们也学习到:控制流变化时注意获取锁和释放锁;增加并发不一定能提高性能;有性能问题的时候再做优化。关于最后一点,避免不成熟的优化(premature optimization),对于所有关心性能的开发者都有用。我们让整个应用的某一小部分变快,却没有提高整体性能,其实没有价值。
  • 当然,我们只触及了高性能数据结构的皮毛。Moir 和 Shavit 的调查提供了更多信息,包括指向其他来源的链接。特别是,你可能会对其他结构感兴趣(比如 B 树),那么数据库课程会是一个不错的选择。你也可能对根本不用传统锁的技术感兴趣。这种非阻塞数据结构是有意义的,在常见并发问题的章节中,我们会稍稍涉及。但老实说这是一个广泛领域的知识,远非本书所能覆盖。感兴趣的读者可以自行研究。