【MTI 6.S081 Lab】locks
- Memory allocator (moderate)
- 实验任务
- Hint
- 解决方案
 
- Buffer cache (hard)
- 实验任务
- Hint
- 解决方案
- 数据结构设计
- 初始化数据结构
- get
- relse
 
 
本实验前去看《操作系统导论》第29章基于锁的并发数据结构,将会是很有帮助的。
在这个实验室中,您将获得重新设计代码以提高并行性的经验。多核机器上并行性差的一个常见问题是锁的高竞争。提高并行性通常需要更改数据结构和锁定策略,以减少争用。您将为xv6内存分配器和块缓存执行此操作。
Before writing code, make sure to read the following parts from the xv6 book :
- Chapter 6: “Locking” and the corresponding code.
- Section 3.5: “Code: Physical memory allocator”
- Section 8.1 through 8.3: “Overview”, “Buffer cache layer”, and “Code: Buffer cache”
Memory allocator (moderate)
程序user/kaloctest强调xv6的内存分配器:三个进程的地址空间不断增加和缩小,导致对kalloc和kfree的多次调用。kalloc和kfree获得kmem.lock。kalloctest打印(作为“#test and set”)由于试图获取另一个核心已经持有的锁而导致的获取中的循环迭代次数,用于kmem锁和其他一些锁。捕获中的循环迭代次数是锁争用的粗略度量。在你开始实验室之前,kalloctest的输出看起来与此类似:

您可能会看到与此处显示的计数不同的计数,以及前5个争用锁的不同顺序。
acquire为每个锁维护要为该锁获取的调用计数,以及acquire中的循环尝试但未能设置锁的次数。kalloctest调用一个系统调用,使内核打印kmem和bcache锁(这是本实验的重点)以及5个争用最多的锁的计数。如果存在锁争用,则获取循环迭代的次数将很大。系统调用返回kmem和bcache锁的循环迭代次数之和。
对于这个实验,您必须使用一台带有多个核心的专用空载机器。如果你使用一台正在做其他事情的机器,kalloctest打印的计数将是无稽之谈。你可以使用专用的Athena工作站,或者你自己的笔记本电脑,但不要使用拨号机。
kalloctest中锁争用的根本原因是kalloc()有一个单独的空闲列表,由一个锁保护。要消除锁争用,您必须重新设计内存分配器,以避免出现单个锁和列表。基本思想是为每个CPU维护一个空闲列表,每个列表都有自己的锁。不同CPU上的分配和释放可以并行运行,因为每个CPU将在不同的列表上运行。主要的挑战将是处理这样的情况:一个CPU的空闲列表是空的,但另一个CPU列表有空闲内存;在这种情况下,一个CPU必须“窃取”另一个CPU空闲列表的一部分。窃取可能会引入锁争用,但希望这种情况不会发生。
实验任务
你的工作是实现每个CPU的空闲列表,并在CPU的空闲清单为空时进行窃取。您必须提供所有以“kmem”开头的锁名称。也就是说,您应该为每个锁调用initlock,并传递一个以“kmem”开头的名称。运行kalloctest查看您的实现是否减少了锁争用。要检查它是否仍然可以分配所有内存,请运行usertests sbrkmuch。您的输出将与下面显示的类似,kmem锁上的争用总数大大减少,尽管具体数字会有所不同。确保usertests-q中的所有测试都通过。Make grade应该表示kalloctests通过。
Hint
-  您可以使用kernel/param.h中的常量NCPU 
-  让freerange给运行freerange的CPU所有可用内存。 
-  函数cpuid返回当前的核心编号,但只有在中断关闭时调用它并使用其结果才是安全的。您应该使push_off()和pop_off(()来关闭和打开中断。 
-  看看kernel/ssprintf.c中的snprintf函数,了解字符串格式化的想法。不过,可以将所有锁命名为“kmem”。 
-  可以选择使用xv6的竞争检测器运行您的解决方案: $ make clean $ make KCSAN=1 qemu $ kalloctestkalloctest可能会失败,但你不应该看到任何竞争。如果xv6的竞争检测器观察到竞争,它将沿着以下行打印描述竞争的两个堆栈跟踪: 在您的操作系统上,您可以通过将回溯剪切并粘贴到addr2line中,将其转换为带有行号的函数名: 您不需要运行竞争检测器,但您可能会发现它很有帮助。请注意,竞争检测器会显著降低xv6的速度,因此您可能不想在运行用户测试时使用它。 
解决方案
void
freerange(void *pa_start, void *pa_end)
{
  char *p;
  p = (char*)PGROUNDUP((uint64)pa_start);
  int id = 0;
  struct run *r;
  memset(p, 1, (char*)pa_end - p);
  for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE) {
    r = (struct run*)p;
    r->next = kmem[id].freelist;
    kmem[id].freelist = r;
  }
}
void *
kalloc(void)
{
  struct run *r;
  push_off();
  int id = cpuid();
  pop_off();
  acquire(&kmem[id].lock);
  if (!kmem[id].freelist) {
    int flag = 1;
    while (flag) {
      flag = 0;
      for (int i = 0; i < NCPU ; ++i) {
        if (i == id) {
          continue;
        }
        if (kmem[i].freelist) { // 先检测一次,避免锁中等待
          // 这一轮中检测到有空闲的,如果没有分到,说明可能是自旋锁等待太久了,且当前及之后的i都被分配了
          // 但是由于在这里等了很久,那么前面的可能有释放了,所以我们可以再次循环一次
          // 如果flag一直等于0,说明没进入这里,那么说明系统的page实在是太少了,因为NCPU的if语句将会执行非常快,时间几乎可以忽略
          // 甚至运行时间没有个kfree的时间长,所以此时说明系统真的是没有多余的page了
          // 当然也会有一定的产生错误的可能,但是此时系统中的page必然很少,甚至于每个cpu上只有几个page
          // 这个错误,我们就不关注了,在这种情况下,系统告诉程序没有内存可分配了,也是合理的
          // 经过这个修改后,kalloctest便没有出错了,不然多运行几次,总会在test3出现错误,说明当时没有可分配的
          // 但是对于test3来说,应该要是有可以分配的。
          flag = 1; 
          acquire(&kmem[i].lock);
          if (kmem[i].freelist) {
            kmem[id].freelist = kmem[i].freelist;
            // steal half of memory
            struct run *fast = kmem[i].freelist; // faster pointer
            struct run *slow = kmem[i].freelist;
            struct run *pre = 0;
            while (fast && fast->next) {
              fast = fast->next->next;
              pre = slow;
              slow = slow->next;
            }
            if (pre == 0) {
              // only have one page
              kmem[i].freelist = 0;
            }
            else {
              kmem[i].freelist = slow;
              pre->next = 0;
            }
            release(&kmem[i].lock);
            break;
          }
          release(&kmem[i].lock);
        }
      }
      if (kmem[id].freelist) {
        break;
      }
    }
  }
  r = kmem[id].freelist;
  if(r) {
    kmem[id].freelist = r->next;
  }
  release(&kmem[id].lock);
  if(r)
    memset((char*)r, 5, PGSIZE); // fill with junk
  return (void*)r;
}
Buffer cache (hard)
这一半任务独立于前一半任务;无论你是否完成了前半部分,你都可以在这半部分工作(并通过测试)。
如果多个进程密集使用文件系统,它们可能会争夺bcache.lock,后者保护kernel/bio.c中的磁盘块缓存。bcachetest创建了多个进程,这些进程重复读取不同的文件,以在bcache.lock上产生争用;它的输出是这样的(在你完成这个实验室之前):
您可能会看到不同的输出,但bcache锁的测试和设置数量会很高。如果您查看kernel/bio.c中的代码,您会发现bcache.lock保护缓存的块缓冲区列表、每个块缓冲区中的引用计数(b->refcnt)以及缓存块的标识(b->dev和b->blockno)。
实验任务
修改块缓存,以便在运行bcachetest时,bcache中所有锁的获取循环迭代次数接近于零。理想情况下,块缓存中涉及的所有锁的计数之和应该为零,但如果总和小于500也可以。修改bget和brelse,使bcache中不同块的并发查找和释放不太可能在锁上发生冲突(例如,不必全部等待bcache.lock)。您必须保持不变,即每个块最多缓存一个副本。完成后,您的输出应该与下面显示的类似(尽管不完全相同)。确保“usertests-q”仍然通过。当你完成所有测试时,make grade应该通过所有测试。

请给出所有以“bcache”开头的锁名。也就是说,您应该为每个锁调用initlock,并传递一个以“bcache”开头的名称。
减少块缓存中的争用比kalloc更为棘手,因为bcache缓冲区在进程(以及CPU)之间是真正共享的。对于kalloc,可以通过给每个CPU自己的分配器来消除大多数争用;这对块缓存不起作用。我们建议您使用每个哈希桶都有锁的哈希表在缓存中查找块号。
在某些情况下,如果您的解决方案存在锁冲突,也可以:
- 当两个进程同时使用相同的块号时。bcachetest test0从不执行此操作。
- 当两个进程同时在缓存中未命中,并且需要找到一个未使用的块来替换时。bcachetest test0从不执行此操作。
- 当两个进程同时使用冲突的块时,无论您使用什么方案来划分块和锁;例如,如果两个进程使用的块的块号哈希到哈希表中的同一个槽。bcachetesttest0可能会这样做,这取决于您的设计,但您应该尝试调整方案的详细信息以避免冲突(例如,更改哈希表的大小)。
bcachetest的test1使用了比缓冲区更多的不同块,并练习了许多文件系统代码路径。
Hint
- 阅读xv6手册中关于块缓存的描述(第8.1-8.3节)。
- 可以使用固定数量的bucket,而不动态调整哈希表的大小。使用素数的桶(例如,13)来减少散列冲突的可能性。
- 在哈希表中搜索缓冲区并在找不到缓冲区时为该缓冲区分配条目必须是原子的。
- 删除所有缓冲区的列表(bcache.head等),不要实现LRU。通过此更改,brelse不需要获取bcache锁。在bget中,您可以选择refcnt==0的任何块,而不是最近使用最少的块。
- 您可能无法以原子方式检查一个已经缓存的buf和如果未缓存找一个未使用的buf;如果缓冲区不在缓存中,则可能需要删除所有锁并从头开始。在bget中串行查找未使用的buf(即,当查找在缓存中未命中时,bget中选择要重用的缓冲区的部分)是可以的。
- 在某些情况下,您的解决方案可能需要持有两个锁;例如,在驱逐过程中,您可能需要持有bcache锁和每个bucket一个锁。确保避免死锁。
- 当替换一个块时,您可能会将struct buf从一个bucket移动到另一个bucket,因为新的块会散列到不同的bucket。您可能会遇到一个棘手的情况:新块可能会与旧块哈希到同一个bucket。在这种情况下,一定要避免死锁。
- 一些调试技巧:实现bucket锁,但将全局bcache.lock获取/释放留在bget的开始/结束处,以序列化代码。一旦您确定它在没有竞争条件的情况下是正确的,就删除全局锁并处理并发问题。您还可以运行makeCPUS=1qemu来使用一个核心进行测试。
- 使用xv6的竞争检测器来查找潜在的竞争(请参阅上面如何使用竞争检测器)。
解决方案
数据结构设计
哈希桶为素数,有利于block在哈希表中均匀分布。
#define HASH_BUCKETS 13
struct {
  struct spinlock lock;
  struct buf *buf; 
  char name[128];
} hash_buckets[HASH_BUCKETS];
static int
hash(int blockbo) {
  return blockbo % HASH_BUCKETS;
}
初始化数据结构
void
binit(void)
{
  // struct buf *b;
  initlock(&bcache.lock, "bcache");
  for (int i = 0; i < HASH_BUCKETS; ++i) {
    snprintf(hash_buckets[i].name, sizeof(hash_buckets[i].name), "hash_buckets_%d", i);
    initlock(&hash_buckets[i].lock, hash_buckets[i].name);
  }
  for (int i = 0; i < NBUF - 1; ++i) {
    bcache.buf[i].next = &bcache.buf[i+1];
    bcache.buf[i+1].prev = &bcache.buf[i];
  }
  hash_buckets[0].buf = bcache.buf;
}
全部将其放入0号桶,这可能会在开始时造成一些竞争。
get
acquire(&hash_buckets[rdx].lock);这里能获得锁的原因是,不会同时有两个进入这个区域,且b一定不再桶idx中。因为之前我们已经搜索过idx了,里面没有,而我们又是一直有idx的锁,所以不可能从无变成有。同时,rdx中有,那么rdx的锁不可能走到第34行处,所以也不会产生死锁。
获得锁后,再检测一次,避免在break到现在的空隙,别的进程已经更改了这个block的状态。如果状态已经改变,那么就去refind处重新查找
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
  struct buf *b;
  int idx = hash(blockno);
  acquire(&hash_buckets[idx].lock);
  for (b = hash_buckets[idx].buf; b; b = b->next) {
    if(b->dev == dev && b->blockno == blockno){
      b->refcnt++;
      release(&hash_buckets[idx].lock);
      acquiresleep(&b->lock);
      return b;
    }
  }
  // 处理冲突,在空闲中找
  // 首先在当前找
  for (b = hash_buckets[idx].buf; b; b = b->next) {
    if (b->refcnt == 0) {
      // 可以移除,但是由于hash值一样,那么就在此bucket中
      b->dev = dev;
      b->blockno = blockno;
      b->valid = 0;
      b->refcnt = 1;
      release(&hash_buckets[idx].lock);
      acquiresleep(&b->lock);
      return b;
    }
  }
  acquire(&bcache.lock);
refind:
  b = 0;
  for (int i = 0; i < NBUF; ++i) {
    if (bcache.buf[i].refcnt == 0) {
      b = &bcache.buf[i];
      break;
    }
  }
  if (!b) {
    panic("bget: no buffers");
  }
  int rdx = hash(b->blockno);
  // 这里能获得锁的原因是,不会同时有两个进入这个区域,且b一定不再桶idx中
  // 因为之前我们已经搜索过idx了,里面没有,而我们又是一直有idx的锁,所以不可能从无变成有
  // 同时,rdx中有,那么rdx的锁不可能走到第34行处,所以也不会产生死锁
  acquire(&hash_buckets[rdx].lock);
  if (b->refcnt != 0) {	// 获得锁后,再检测一次,避免在break到现在的空隙,别的进程已经更改了这个block的状态
    release(&hash_buckets[rdx].lock);
    goto refind;
  }
  // 从bucket中移除
  if(b->prev) {
    b->prev->next = b->next;
  } 
  if (b->next) {
    b->next->prev = b->prev;
  }
  if (hash_buckets[rdx].buf == b) {
    // 去除第一个
    hash_buckets[rdx].buf = b->next;
  }
  release(&hash_buckets[rdx].lock);
  // 加入idx中
  if(hash_buckets[idx].buf) {
    b->next = hash_buckets[idx].buf;
    hash_buckets[idx].buf->prev = b;
    b->prev = 0;
    hash_buckets[idx].buf = b;
  } else {
    b->next = 0;
    b->prev = 0;
    hash_buckets[idx].buf = b;
  }
  b->valid = 0;
  b->refcnt = 1;
  b->dev = dev;
  b->blockno = blockno;
  release(&bcache.lock);
  release(&hash_buckets[idx].lock);
  acquiresleep(&b->lock);
  return b;
}
relse
在brelse中,首先,b此时别的进程可以用了,所有releasesleep。然后锁住所在的桶,将block的引用减1。此时不用担心b->refcnt会变为负数,因为b->refcnt的更新是原子的,且get后才能release,所以必然是先加然后减,那么就不用担心其变为负数了。
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
  if(!holdingsleep(&b->lock))
    panic("brelse");
  releasesleep(&b->lock);
  int idx = hash(b->blockno);
  acquire(&hash_buckets[idx].lock);
  b->refcnt--;
  release(&hash_buckets[idx].lock);
}
void
bpin(struct buf *b) {
  int idx = hash(b->blockno);
  acquire(&hash_buckets[idx].lock);
  b->refcnt++;
  release(&hash_buckets[idx].lock);
}
void
bunpin(struct buf *b) {
  int idx = hash(b->blockno);
  acquire(&hash_buckets[idx].lock);
  b->refcnt--;
  release(&hash_buckets[idx].lock);
}



















