背景 调研以往的优秀作品中,大部分的任务调度队列使用的是全局队列,即将所有任务都放在一个fifo队列中,这样可以简化调度策略,方便代码设计。但是这样设计的缺陷也是显而易见,在多核情况下,每个核add、fetch任务都需要对全局队列进行加锁,这样会造成大量的锁竞争和等待,降低cpu利用率。
为了提高性能减少锁竞争,我们在xv6 lab8:lock的提示下想到可以将全局队列拆分给每个核的单独队列,同时保留一个全局优先队列。
设计思路 基于工作窃取机制,每个核会先从全局优先队列中fetch任务,这里面放的是被信号唤醒的任务;如果没有优先任务,那么再从自己的队列中取任务执行;这样极大地减少了锁竞争的出现。但是这里又会出现一个极端情况,当存在大量的优先任务时,普通任务会出现饥饿现象,同时这样的设计也会退化到最初版本。所以需要为工作窃取机制找到合适的工作场景才能最大化提高性能。
暂时的实现:
#![no_std] extern crate alloc;use alloc::collections::VecDeque;use core::future::Future;use core::sync::atomic::{AtomicUsize, Ordering};use async_task::{Runnable, ScheduleInfo, Task, WithInfo};use crate::sync::SpinNoIrqLock;const NUM_WORKERS: usize = 4 ; static TASK_QUEUE: TaskQueue = TaskQueue::new ();struct TaskQueue { normal: [SpinNoIrqLock<VecDeque<Runnable>>; NUM_WORKERS], prior: SpinNoIrqLock<VecDeque<Runnable>>, next_worker: AtomicUsize, } impl TaskQueue { pub const fn new () -> Self { const EMPTY_QUEUE: SpinNoIrqLock<VecDeque<Runnable>> = SpinNoIrqLock::new (VecDeque::new ()); Self { normal: [EMPTY_QUEUE; NUM_WORKERS], prior: SpinNoIrqLock::new (VecDeque::new ()), next_worker: AtomicUsize::new (0 ), } } pub fn push_normal (&self , runnable: Runnable) { let worker_id = self .next_worker.fetch_add (1 , Ordering::Relaxed) % NUM_WORKERS; self .normal[worker_id].lock ().push_back (runnable); } pub fn push_prior (&self , runnable: Runnable) { self .prior.lock ().push_back (runnable); } pub fn fetch (&self , worker_id: usize ) -> Option <Runnable> { if let Some (task) = self .prior.lock ().pop_front () { return Some (task); } if let Some (task) = self .normal[worker_id].lock ().pop_front () { return Some (task); } for i in 0 ..NUM_WORKERS { if i != worker_id { if let Some (task) = self .normal[i].lock ().pop_front () { return Some (task); } } } None } pub fn len (&self ) -> usize { let prior_len = self .prior.lock ().len (); let normal_len = self .normal.iter ().map (|q| q.lock ().len ()).sum::<usize >(); prior_len + normal_len } pub fn is_empty (&self ) -> bool { self .prior.lock ().is_empty () && self .normal.iter ().all (|q| q.lock ().is_empty ()) } } pub fn spawn <F>(future: F)where F: Future + Send + 'static , F::Output: Send + 'static , { let schedule = move |runnable: Runnable, info: ScheduleInfo| { if info.woken_while_running { TASK_QUEUE.push_normal (runnable); } else { TASK_QUEUE.push_prior (runnable); } }; let (runnable, task) = async_task::spawn (future, WithInfo (schedule)); runnable.schedule (); task.detach (); } pub fn run (worker_id: usize ) { while let Some (task) = TASK_QUEUE.fetch (worker_id) { task.run (); } }
在一些情境下,我们可以增加队列数量,这会带来如下好处:
更好的I/O密集型任务处理 :
如果任务中有较多的I/O操作(如文件读写、网络请求等),多Worker
可以在某些Worker
等待I/O时,切换到其他任务,提高CPU利用率。
更高的任务并发性 :
更多的Worker
可以同时处理更多的任务,适合任务量较大的场景。
灵活性 :
可以根据任务类型动态调整Worker
的数量,例如为高优先级任务分配更多的Worker
。
==但是注意:这并不代表worker数量越多越好,过多的worker可能会增加对全局优先队列的锁竞争==