Multiprocessor and Lock

  • 锁就是一个对象,就像其他在内核中的对象一样。有一个结构体叫做 lock,它包含了一些字段,这些字段中维护了锁的状态。锁有非常直观的 API:

    • acquire,接收指向 lock 的指针作为参数。acquire 确保了在任何时间,只会有一个进程能够成功的获取锁。
    • release,也接收指向 lock 的指针作为参数。在同一时间尝试获取锁的其他进程需要等待,直到持有锁的进程对锁调用 release。
  • 锁应该与操作而不是数据关联,所以自动加锁在某些场景下会出问题

  • 想要程序简单点,可以通过 coarse-grain locking(注,也就是大锁),但是这时你就失去了性能

锁的特性

  • 避免丢失更新
  • 锁可以打包多个操作,使它们具有原子性
  • 可以维护共享数据结构的不变性

challenge

  • 死锁

  • 破坏了程序的模块化

  • 通常来说,开发的流程是:

    • 先以 coarse-grained lock(注,也就是大锁)开始。
    • 再对程序进行测试,来看一下程序是否能使用多核。
    • 如果可以的话,那么工作就结束了,你对于锁的设计足够好了;如果不可以的话,那意味着锁存在竞争,多个进程会尝试获取同一个锁,因此它们将会序列化的执行,性能也上不去,之后你就需要重构程序。

uart 中的锁

  • 向 buffer 中写入数据时加锁
void
uartputc(int c)
{
acquire(&uart_tx_lock);

if(panicked){
for(;;)
;
}
while(uart_tx_w == uart_tx_r + UART_TX_BUF_SIZE){
// buffer is full.
// wait for uartstart() to open up space in the buffer.
sleep(&uart_tx_r, &uart_tx_lock);
}
uart_tx_buf[uart_tx_w % UART_TX_BUF_SIZE] = c;
uart_tx_w += 1;
uartstart();
release(&uart_tx_lock);
}
  • 当 UART 硬件完成传输,会产生一个中断。在前面的代码中我们知道了 uartstart 的调用者会获得锁以确保不会有多个进程同时向 THR 寄存器写数据。但是 UART 中断本身也可能与调用 printf 的进程并行执行。如果一个进程调用了 printf,它运行在 CPU0 上;CPU1 处理了 UART 中断,那么 CPU1 也会调用 uartstart。因为我们想要确保对于 THR 寄存器只有一个写入者,同时也确保传输缓存的特性不变(注,这里指的是在 uartstart 中对于 uart_tx_r 指针的更新),我们需要在中断处理函数中也获取锁。
void
uartintr(void)
{
// read and process incoming characters.
while(1){
int c = uartgetc();
if(c == -1)
break;
consoleintr(c);
}

// send buffered characters.
acquire(&uart_tx_lock);
uartstart();
release(&uart_tx_lock);
}

spin lock

实现方法

  • 特殊的硬件指令
    • 这个特殊的硬件指令会保证一次 test-and-set 操作的原子性。在 RISC-V 上,这个特殊的指令就是 amoswap(atomic memory swap)。
    • 这个指令接收 3 个参数,分别是 address,寄存器 r1,寄存器 r2。这条指令会先锁定住 address,将 address 中的数据保存在一个临时变量中(tmp),之后将 r1 中的数据写入到地址中,之后再将保存在临时变量中的数据写入到 r2 中,最后再对于地址解锁。
struct spinlock {
uint locked; // Is the lock held?

// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
};

void
acquire(struct spinlock *lk)
{
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");

// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
;

// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();

// Record info about lock acquisition for holding() and debugging.
lk->cpu = mycpu();
}

// Release the lock.
void
release(struct spinlock *lk)
{
if(!holding(lk))
panic("release");

lk->cpu = 0;

// Tell the C compiler and the CPU to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other CPUs before the lock is released,
// and that loads in the critical section occur strictly before
// the lock is released.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();

// Release the lock, equivalent to lk->locked = 0.
// This code doesn't use a C assignment, since the C standard
// implies that an assignment might be implemented with
// multiple store instructions.
// On RISC-V, sync_lock_release turns into an atomic swap:
// s1 = &lk->locked
// amoswap.w zero, zero, (s1)
__sync_lock_release(&lk->locked);

pop_off();
}

spin lock 的三个细节

  • 一个 store 指令并不总是一个原子指令,取决于具体的实现

    • 对于 CPU 内的缓存,每一个 cache line 的大小可能大于一个整数,那么 store 指令实际的过程将会是:首先会加载 cache line,之后再更新 cache line。所以对于 store 指令来说,里面包含了两个微指令。这样的话就有可能得到错误的结果。所以为了避免理解硬件实现的所有细节,例如整数操作不是原子的,或者向一个 64bit 的内存值写数据是不是原子的,我们直接使用一个 RISC-V 提供的确保原子性的指令来将 locked 字段写为 0。
  • spinlock 需要处理两类并发,一类是不同 CPU 之间的并发,一类是相同 CPU 上中断和普通程序之间的并发。针对后一种情况,我们需要在 acquire 中关闭中断。中断会在 release 的结束位置再次打开,因为在这个位置才能再次安全的接收中断

  • 第三个细节就是 memory ordering。

    • 假设我们先通过将 locked 字段设置为 1 来获取锁,之后对 x 加 1,最后再将 locked 字段设置 0 来释放锁;编译器会优化指令顺序来提高性能

    • 为了禁止,或者说为了告诉编译器和硬件不要这样做,我们需要使用 memory fence 或者叫做 synchronize 指令,来确定指令的移动范围。对于 synchronize 指令,任何在它之前的 load/store 指令,都不能移动到它之后。锁的 acquire 和 release 函数都包含了 synchronize 指令。