文章目录
- 下半部
- 软中断
- 软中断的实现
- 使用软中断
- tasklet
- tasklet 的实现
- 使用 tasklet
- 工作队列
- 工作队列的实现
- 使用工作队列
下半部
中断处理程序的局限性:
- 中断处理程序以异步方式执行,并且可能打断其它代码,因此为了避免被打断的代码停止时间过长,中断处理程序应该执行得越快越好。
- 当一个中断处理程序正在执行时,最好的情况是没设置 IRQF-DISABLED,因为这样只会屏蔽同级的中断,在最坏的是设置了 IRQF-DISABLED,当前处理上的其它中断都会被屏蔽。禁用中断后硬件与操作系统无法通信,因此中断处理程序执行得越快越好。
- 由于中断处理程序往往需要对硬件进行操作,所以它们通常有很高的时限要求。
- 中断处理程序不能在进程上下文运行,所以它们不能阻塞。这限制了它们所做的事情。
中断处理流程的上半部有快速、异步、简单的机制对硬件中断做出响应。而对于其它的、对时间要求宽松的任务就应该被推后到下半部执行。
什么功能该归类于下半部并没有明确规定,这完全取决于驱动程序开发者自己的判断。但有一些借鉴:
- 对时间非常敏感的任务,放到中断处理程序执行。
- 和硬件相关的任务,放到中断处理程序执行。
- 任务要保证不背其它中断打断,放到中断处理程序执行。
- 其它任务,考虑放到下半部。
下半部并没有一个明确的执行时间,将任务推迟执行,等待系统不太繁忙且中断恢复后执行就可以了。通常下半部会在中断处理程序结束后马上执行。下半部执行的关键在于当它们运动时,允许响应所有中断。
软中断
软中断的实现
软中断的结构
/* softirq mask and active fields moved to irq_cpustat_t in
* asm/hardirq.h to get better cache usage. KAO
*/
struct softirq_action {
void (*action)(struct softirq_action *);
};
每个被注册的软中断都占据该数组的一项,NR_SOFTIRQS 为 32,因此最多可能有 32 个软中断。(这是定值,无法动态改变)
static struct softirq_action softirq_vec[NR_SOFTIRQS];
软中断处理函数
void my_softirq_handler(struct softirq_action *action) {
// 在这里执行软中断处理操作
}
当内核运行一个软中断处理程序时,就会执行 action 函数。如:
// 假设 my_softirq 指向 softirq_vec[x]
my_softirq[x] -> action(my_softirq);
可以看到内核将整个结构体都传递给了软中断处理程序,而不是仅仅传递数据值。这个小技巧可以保证将来在结构体中加入新的域时,无需对所有的软中断处理程序都进行变动。如果需要,软中断处理程序可以方便地解析它的参数,从数据成员中提取数值。
一个软中断不会抢占另一个软中断。实际上,唯一可以抢占软中断的是中断处理程序。不过,其它的软中断(甚至是相同类型的软中断)可以在其它处理器上同时执行。
执行软中断
一个注册的软中断必须在标记后才会执行。这被称作触发软中断。通常,中断处理程序会在返回前标记它的软中断,使其在稍后被执行。在何时的时刻,该软中断就会执行。
如下情况,待处理的软中断会被检查和执行,(或许说成唤醒会更加合适):
- 从一个硬件中断代码处返回
- 在 ksoftirqd 内核线程中
- 在那些显式检查和执行待处理的软中断的代码中,如网络子系统中。
无论是用什么办法唤起,软中断都要在 do_softirq() 中执行。若有待处理的软中断,该函数会遍历每一个,调用它们的处理程序。
使用软中断
分配索引
在编译期间,通过在 linux/interrupt.h 中定义的一个枚举类型来静态地声明软中断。
当需要增加一个新的软中断时,根据优先级(索引)插入,尽量放在 BLOCK_SOFTIRQ 和 TASKLET_SOFTIRQ 之间。
/* PLEASE, avoid to allocate new softirqs, if you need not _really_ high
frequency threaded job scheduling. For almost all the purposes
tasklets are more than enough. F.e. all serial device BHs et
al. should be converted to tasklets, not to softirqs.
*/
enum
{
HI_SOFTIRQ=0,
TIMER_SOFTIRQ,
NET_TX_SOFTIRQ,
NET_RX_SOFTIRQ,
BLOCK_SOFTIRQ,
BLOCK_IOPOLL_SOFTIRQ,
TASKLET_SOFTIRQ,
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */
NR_SOFTIRQS
};
注册软中断处理程序
void open_softirq(int nr, void (*action)(struct softirq_action *));
- nr:软中断的索引号
- action:软中断处理函数
因为软中断处理程序执行时,允许响应中断,但软中断处理程序不可以休眠。在一个处理程序允许的时候,当前处理器上的软中断被禁止。但其它的处理仍然可以执行别的软中断。实际上,若一个软中断在被它执行的同时再次被触发了,那么另一个处理器也可以同时允许其处理程序。这会导致数据被共享,因此需要加锁保护数据。
大部分软中断处理程序,都通过采取单处理器数据(仅属于某一个处理器的数据,因此根本不需要加锁)或其它一些技巧来避免显示地加锁,从而提供更出色的性能。
引入软中断是因为其可扩展性。若不要扩展到多个处理器,那么就使用 tasklet 把。tasklet 本质上也是个软中断,只不过同一个处理程序的多个实例不能在多个处理器上同时运行。
触发软中断
// 将一个软中断设置为挂起状态,让它在下次调用 do_softirq() 函数时投入运行。
void raise_softirq(unsigned int nr);
例如,挂起网络子系统:raise_softirq(NET_TX_SOFTIRQ);
这会触发 NET_TX_SOFTIRQ 软中断。它的处理程序 net_tx_action() 就会在内核下一次执行软中断时投入运行。该函数在触发一个软中断之前先要禁止中断,触发后再恢复原来的状态。若中断被就禁用,那么调用 raise_softirq_irqoff() 会更加合适。
在中断处理程序中触发软中断是最常见的形式。在这种情况下,中断处理程序执行硬件相关的操作,然后触发相应的软中断,最后退出。内核在执行完中断处理程序后,马上就会调用 do_softirq() 函数。于是,软中断开始执行中断处理程序留给它去完成的剩余任务。
tasklet
tasklet 的实现
tasklet 结构体
/* Tasklets --- multithreaded analogue of BHs.
Main feature differing them of generic softirqs: tasklet
is running only on one CPU simultaneously.
Main feature differing them of BHs: different tasklets
may be run simultaneously on different CPUs.
Properties:
* If tasklet_schedule() is called, then tasklet is guaranteed
to be executed on some cpu at least once after this.
* If the tasklet is already scheduled, but its excecution is still not
started, it will be executed only once.
* If this tasklet is already running on another CPU (or schedule is called
from tasklet itself), it is rescheduled for later.
* Tasklet is strictly serialized wrt itself, but not
wrt another tasklets. If client needs some intertask synchronization,
he makes it with spinlocks.
*/
struct tasklet_struct {
struct tasklet_struct *next; // 下一个
unsigned long state; // tasklet 的状态,其有三值
// TASKLET_STATE_SCHED:tasklet 已被调度,准备投入运行
// TASKLET_STATE_RUN:tasklet 正在运行
atomic_t count; // 引用计数器
// != 0,则 tasklet 被禁止,不允许执行
// == 0,tasklet 被激活
void (*func)(unsigned long); // tasklet 的处理函数
unsigned long data; // 作为 func 的参数
};
调度 tasklet
已调度的 tasklet 存放在两个单处理器数据结构:
- tasklet_vec:普通
- tasklet_hi_vec:高级
调度函数:
-
tasklet_schedule:使用 TASKLET_SOFTIRQ
// include/linux/interrupt.h static inline void tasklet_schedule(struct tasklet_struct *t) { // 1.检查 tasklet 的状态是否为 TASKLET_STATE_SCHED。若是,说明 tasklet 已经被调度过了,函数立即返回。 if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_schedule(t); // 2.若不是,则调用 __tasklet_schedule()。 }
// kernel/softirq.c void __tasklet_schedule(struct tasklet_struct *t) { unsigned long flags; local_irq_save(flags); // 3.保存中断状态,且禁用本地中断。(为了保证处理器上的数据不会乱) // -------------------------------------- // 4.把需要调度的 tasklet 加入到每个处理器的一个 tasklet_vec 或 tasklet_hi_vec 链表的表头上去。 t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next); // -------------------------------------- // 5.唤醒 TASKLET_SOFTIRQ 或 HI_SOFTIRQ 软中断,使其下次调用 do_softirq() 时会执行该 tasklet。 raise_softirq_irqoff(TASKLET_SOFTIRQ); // 6.恢复中断状态 local_irq_restore(flags); }
// include/asm-generic/percpu.h #define __get_cpu_var(var) (var)
-
tasklet_hi_schedule:使用 HI_SOFTIRQ
static inline void tasklet_hi_schedule(struct tasklet_struct *t) { if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_hi_schedule(t); }
最后就是通过 do_sofrirq() 执行对应的软中断处理程序:
- tasklet_action()
- tasklet_hi_action()
使用 tasklet
声明 tasklet
静态创建一个 tasklet 的直接引用:
// include/linux/interrupt.h
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
// 下面这个因为计数器 != 0,因此 tasklet 处于禁用状态,需要 tasket_enable() 手动激活
#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
通过一个间接引用(一个指针)赋值给一个动态创建的 tasklet_struct 结构的方式来初始化一个 tasklet_init():
extern void tasklet_init(struct tasklet_struct *t,
void (*func)(unsigned long), unsigned long data);
编写 tasklet 处理程序
格式要求如下:
void tasklet_handler(unsigned long data)
tasklet 依靠软中断实现,因此 tasklet 不能休眠。意味着无法在 tasklet 中使用阻塞函数。
和软中断不同的是,两个相同的 tasklet 绝不会同时执行,即便他们是在不同的处理器上。
调度 tasklet
调用 tasklet_schedule() 函数并且传入相应的 tasklet_struct 指针。
tasklet_schedule(&my_tasklet); // 把 my_tasklet 标记为挂起
这个 taskle 被调度之后,还未运行,此时又有一个相同的 tasklet 又被调度了,那么它仍然只会运行一次,或者说后来的那个无法被调度,因为在 task_schedule() 中已经做了判断。而如果它已经开始执行,那么这个新的 tasklet 将被重新调度并再次运行。
tasklet 只要开始执行,就不会自主停止,这时希望更好地利用处理器的高速缓存。—— 调用 tasklet_disable() 来解决。
若 taskel 未执行完成,则会等待执行完成后再进行停止。tasklet_disable_nosync() 就无需如此。
tasklet_enable 可以激活一个 tasklet。
tasklet_kill() 从挂起的队列中去掉一个 tasklet。由于该函数可能引起休眠,因此禁止在中断上下文中使用它。
ksoftirqd
每个处理器都有一组辅助处理软中断的内核线程。
工作队列
工作队列子系统是一个用于创建内核线程的接口,通过它创建的进程负责执行由内核其它部分排到队列里的任务。它创建的这些内核线程称作工作者线程。
工作队列可以让你的驱动程序创建一个专门的工作者线程来处理需要推后的工作。
工作队列的实现
工作者线程使用 workqueue_struct 结构体表示:
// kernel/workqueue.c
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq; // 数组指针,每一项对应系统中的一个处理器
struct list_head list;
const char *name;
int singlethread;
int freezeable; /* Freeze threads during suspend */
int rt;
};
每个处理器对应一个工作者线程,而每个工作者线程都有一个 cpu_workqueue_struct 结构体。
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu).
*/
struct cpu_workqueue_struct {
spinlock_t lock;
struct list_head worklist; // 工作列表
wait_queue_head_t more_work;
struct work_struct *current_work; // 工作链表
struct workqueue_struct *wq; // 关联工作队列结构
struct task_struct *thread; // 关联线程
} ____cacheline_aligned;
表示工作的数据结构
所有的工作者线程都是用普通内核线程实现的,它们都要执行 worker_thread() 函数。在它初始化完后,该函数执行一个死循环并开始休眠。当有操作被插入到队列里的时候,线程就会被唤醒,以便执行这些操作。当没有剩余的操作时,它又会继续休眠。
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
struct lockdep_map lockdep_map;
};
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
if (cwq->wq->freezeable)
set_freezable();
for (;;) { // 核心代码 —— 死循环
// 1. 线程将自己设置为休眠状态(state 被设置为 TASK_INTERRUPTIBLE),并把自己加入到等待队列中。
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) &&
!kthread_should_stop() &&
// 2. 判断工作链表是否为空
list_empty(&cwq->worklist))
schedule(); // 2.1 工作链表为空,调用 schedule() 进入后休眠状态
// schedule() 内找时机将自己设置成 TASK_RUNNING,脱离等待队列进入就绪队列。
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
if (kthread_should_stop())
break;
// 2.2 工作链表非空,调用 run_workqueue() 函数执行被推后的工作
run_workqueue(cwq);
}
return 0;
}
接着由 run_workqueue() 函数来实际完成推后执行的工作。
这里的 2.6 的实现与书上不符
工作队列实现机制的总结
使用工作队列
创建需要推后的工作
静态创建:
// include/linux/workqueue.h
#define DECLARE_WORK(n, f) \
struct work_struct n = __WORK_INITIALIZER(n, f)
动态创建:
// include/linux/workqueue.h
#define INIT_WORK(_work, _func) \
do { \
__INIT_WORK((_work), (_func), 0); \
} while (0)
工作队列处理函数
要求格式如下:
void work_handler(void *data)
该函数会由一个工作者线程执行,因此,该函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以休眠。但是它不能访问用户空间,因为内核线程在用户空间没有相关的映射。
通常,只有在发生系统调用时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。
对工作进行调度
此时工作已经被创建,可以调度它了。
通过下面这个函数可以调度处理函数,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。
int schedule_work(struct work_struct *work) {
return queue_work(keventd_wq, work);
}
工作者线程被唤醒后,延迟执行:
int schedule_delayed_work(struct delayed_work *dwork, unsigned long delay) {
return queue_delayed_work(keventd_wq, dwork, delay);
}
刷新操作
排入队列的工作会在工作者线程下一次被唤醒的时候执行。在执行之前需要保证一些操作已经完成,因此需要先让队列里的任务先完成,最后在执行新的任务。
void flush_scheduled_work(void) {
flush_workqueue(keventd_wq);
}
/**
* flush_workqueue - 确保任何计划的工作已经运行完成。
* @wq: 要刷新的工作队列
*
* 强制执行工作队列并阻塞直到完成。
* 这通常用于驱动程序关闭处理程序。
*
* 我们会一直休眠到所有在排队的工作都处理完为止,
* 但我们不会被新进来的工作锁住。
*
* 此函数用于运行工作队列本身。 现在我们只需要等待辅助线程来完成它。
*/
void flush_workqueue(struct workqueue_struct *wq) {
const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;
might_sleep();
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
for_each_cpu(cpu, cpu_map)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
}
EXPORT_SYMBOL_GPL(flush_workqueue);
创建新的工作队列
若缺省的队列无法满足需要,就创建一个新的工作队列和与之相应的工作者线程。由于这么做会在每个处理器上都创建一个工作者线程,所以只有在你明确了必须要靠自己的一套线程来提高性能的情况下,再创建自己的工作队列。
创建一个任务队列和与之相关的工作者线程:
#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
name 参数用于该内核线程的命名,例如缺省的 events 队列:
static struct workqueue_struct *keventd_wq;
keventd_wq = create_workqueue("events");
这样就创建了所有的工作者线程(每个处理器各有一个),并做好 work 之前的准备工作。
任务队列与相关工作者线程的绑定:
/**
* queue_work - 工作队列上的队列工作
* @wq: 要使用的工作队列
* @work: 排队工作
*
* 如果@work 已经在队列中,则返回 0,否则返回非零。
*
* 我们将工作排队到提交它的 CPU,但如果 CPU 挂掉,它可以由另一个 CPU 处理。
*/
int queue_work(struct workqueue_struct *wq, struct work_struct *work) {
int ret;
ret = queue_work_on(get_cpu(), wq, work);
put_cpu();
return ret;
}
延迟版:
int queue_delayed_work(struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay) {...}