背景

调研以往的优秀作品中,大部分的任务调度队列使用的是全局队列,即将所有任务都放在一个fifo队列中,这样可以简化调度策略,方便代码设计。但是这样设计的缺陷也是显而易见,在多核情况下,每个核add、fetch任务都需要对全局队列进行加锁,这样会造成大量的锁竞争和等待,降低cpu利用率。

为了提高性能减少锁竞争,我们在xv6 lab8:lock的提示下想到可以将全局队列拆分给每个核的单独队列,同时保留一个全局优先队列。

设计思路

基于工作窃取机制,每个核会先从全局优先队列中fetch任务,这里面放的是被信号唤醒的任务;如果没有优先任务,那么再从自己的队列中取任务执行;这样极大地减少了锁竞争的出现。但是这里又会出现一个极端情况,当存在大量的优先任务时,普通任务会出现饥饿现象,同时这样的设计也会退化到最初版本。所以需要为工作窃取机制找到合适的工作场景才能最大化提高性能。

暂时的实现:

#![no_std]

extern crate alloc;

use alloc::collections::VecDeque;
use core::future::Future;
use core::sync::atomic::{AtomicUsize, Ordering};
use async_task::{Runnable, ScheduleInfo, Task, WithInfo};
use crate::sync::SpinNoIrqLock;

const NUM_WORKERS: usize = 4; // 有4个Worker,可以多于cpu数量

static TASK_QUEUE: TaskQueue = TaskQueue::new();

struct TaskQueue {
normal: [SpinNoIrqLock<VecDeque<Runnable>>; NUM_WORKERS], // 每个Worker一个普通队列
prior: SpinNoIrqLock<VecDeque<Runnable>>, // 全局优先队列
next_worker: AtomicUsize, // 用于轮询分配任务
}

impl TaskQueue {
pub const fn new() -> Self {
const EMPTY_QUEUE: SpinNoIrqLock<VecDeque<Runnable>> = SpinNoIrqLock::new(VecDeque::new());
Self {
normal: [EMPTY_QUEUE; NUM_WORKERS],
prior: SpinNoIrqLock::new(VecDeque::new()),
next_worker: AtomicUsize::new(0),
}
}

/// 将普通任务分配到某个Worker队列中
pub fn push_normal(&self, runnable: Runnable) {
let worker_id = self.next_worker.fetch_add(1, Ordering::Relaxed) % NUM_WORKERS;
self.normal[worker_id].lock().push_back(runnable);
}

/// 将优先任务推送到全局优先队列
pub fn push_prior(&self, runnable: Runnable) {
self.prior.lock().push_back(runnable);
}

/// 从队列中获取任务,优先从优先队列中获取,然后从当前Worker队列中获取,最后尝试窃取其他Worker的任务
pub fn fetch(&self, worker_id: usize) -> Option<Runnable> {
// 首先从优先队列中获取任务
if let Some(task) = self.prior.lock().pop_front() {
return Some(task);
}

// 然后从当前Worker队列中获取任务
if let Some(task) = self.normal[worker_id].lock().pop_front() {
return Some(task);
}

// 最后尝试从其他Worker队列中窃取任务
for i in 0..NUM_WORKERS {
if i != worker_id {
if let Some(task) = self.normal[i].lock().pop_front() {
return Some(task);
}
}
}

// 如果没有任务,返回None
None
}

pub fn len(&self) -> usize {
let prior_len = self.prior.lock().len();
let normal_len = self.normal.iter().map(|q| q.lock().len()).sum::<usize>();
prior_len + normal_len
}

pub fn is_empty(&self) -> bool {
self.prior.lock().is_empty() && self.normal.iter().all(|q| q.lock().is_empty())
}
}

/// 将任务加入队列
pub fn spawn<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let schedule = move |runnable: Runnable, info: ScheduleInfo| {
if info.woken_while_running {
TASK_QUEUE.push_normal(runnable);
} else {
TASK_QUEUE.push_prior(runnable);
}
};
let (runnable, task) = async_task::spawn(future, WithInfo(schedule));
runnable.schedule();
task.detach();
}

/// 运行任务队列中的任务
pub fn run(worker_id: usize) {
while let Some(task) = TASK_QUEUE.fetch(worker_id) {
task.run();
}
}

在一些情境下,我们可以增加队列数量,这会带来如下好处:

  • 更好的I/O密集型任务处理
    • 如果任务中有较多的I/O操作(如文件读写、网络请求等),多Worker可以在某些Worker等待I/O时,切换到其他任务,提高CPU利用率。
  • 更高的任务并发性
    • 更多的Worker可以同时处理更多的任务,适合任务量较大的场景。
  • 灵活性
    • 可以根据任务类型动态调整Worker的数量,例如为高优先级任务分配更多的Worker

==但是注意:这并不代表worker数量越多越好,过多的worker可能会增加对全局优先队列的锁竞争==