20 KiB
8.7 设备驱动中的并发控制
并发(concurrency)指的是多个执行单元同时、并行被执行,而并发的执行单元对共享资源(硬件资源和软件上的全局变量、静态变量等)的访问则很容易导致竞态(race conditions)。内核中发生竟态的主要情况有:
- 处理器的多个核心之间
- 多个内核进程之间
- 中断(硬中断、软中断、中断前半部、底半部)与进程之间
解决竞态问题的途径是保证对共享资源的互斥访问,所谓互斥访问是指一个执行单元在访问共享资源的时候,其他的执行单元被禁止访问。访问共享资源的代码区域称为临界区(critical sections),临界区需要以某种互斥机制加以保护。内核中实现互斥的机制有:
- 中断屏蔽
- 原子操作
- 自旋锁
- 信号量
中断屏蔽
local_irq_disable(); // 关中断
... // 临界区代码
local_irq_enable(); // 开中断
local_irq_save(flags); // 关中断并保存当前中断位信息
local_irq_restore(flags); // 开中断并恢复中断位信息
local_bh_disable(); // 禁止中断的底半部
local_bh_enable(); // 使能中断的底半部
要点:关中断时间不可过长,如果有复杂的任务需要处理则应该考虑其他机制实现。
local_irq_disable() 和 local_irq_enable() 都只能禁止和使能本 CPU 核内的中断,因此,并不能解决多 CPU 核心引发的竞态问题。因此,单独使用中断屏蔽通常不是一种值得推荐的避免竞态的方法,它适宜与自旋锁联合使用。
原子操作
整型原子操作
void atomic_set(atomic_t *v, int i); // 设置原子变量的值为 i
atomic_t v = ATOMIC_INIT(0); // 定义原子变量 v 并初始化为 0
atomic_read(atomic_t *v); // 返回原子变量的值
void atomic_add(int i, atomic_t *v); // 原子变量增加 i
void atomic_sub(int i, atomic_t *v); // 原子变量减少 i
void atomic_inc(atomic_t *v); // 原子变量增加 1
void atomic_dec(atomic_t *v); // 原子变量减少 1
int atomic_inc_and_test(atomic_t *v); // 自增后是否为 0
int atomic_dec_and_test(atomic_t *v); // 自减后是否为 0
int atomic_sub_and_test(int i, atomic_t *v); // 减 i 后是否为 0
int atomic_add_return(int i, atomic_t *v); // 进行加操作,并返回新的值
int atomic_sub_return(int i, atomic_t *v); // 进行减操作,并返回新的值
int atomic_inc_return(atomic_t *v); // 进行自增操作,并返回新的值
int atomic_dec_return(atomic_t *v); // 进行自减操作,并返回新的值
位原子操作
void set_bit(nr, void *addr); // 设置 addr 地址的第 nr 位为 1
void clear_bit(nr, void *addr); // 设置 addr 地址的第 nr 位为 0
void change_bit(nr, void *addr); // 对 addr 地址的第 nr 位进行反置
test_bit(nr, void *addr); // 返回 addr 地址的第 nr 位
// 以下 test_and_xxx_bit(nr, void *addr)操作等同于执行 test_bit(nr, void *addr) 后再执行 xxx_bit(nr, void *addr)。
int test_and_set_bit(nr, void *addr);
int test_and_clear_bit(nr, void *addr);
int test_and_change_bit(nr, void *addr);
自旋锁
互斥锁
只允许持有锁的一个对象操作临界区。
spinlock_t spin; // 定义
spin_lock_init(lock); // 该宏用于动态初始化自旋锁
spin_lock(lock); // 该宏用于获得自旋锁 lock,如果能够立即获得锁,它就马上返回,否则,它将自旋在那里,直到该自旋锁的保持者释放。
spin_trylock(lock); // 该宏尝试获得自旋锁 lock,如果能立即获得锁,它获得锁并返回真;否则立即返回假,上不再“在原地等待”。
spin_unlock(lock) // 释放自旋锁
自旋锁可以保证临界区不受别的 CPU 核和本 CPU 核内的抢占进程打扰,但是得到锁的代码路径在执行临界区的时候还可能受到中断和底半部(BH)的影响。为了防止这种影响,就需要用到自旋锁的衍生。
spin_lock_irq(); // = spin_lock() + local_irq_disable()
spin_unlock_irq(); // = spin_unlock() + local_irq_enable()
spin_lock_irqsave(); // = spin_lock() + local_irq_save()
spin_unlock_irqrestore(); // = spin_unlock() + local_irq_restore()
spin_lock_bh(); // = spin_lock() + local_bh_disable()
spin_unlock_bh(); // = spin_unlock() + local_bh_enable()
使用自旋锁需要注意:
- 自旋锁实际上是忙等锁,当锁不可用时,CPU 一直循环执行“测试并设置”该锁直到可用而取得该锁,CPU 在等待自旋锁时不做任何有用的工作,仅仅是等待。因此,只有在占用锁的时间极短的情况下,使用自旋锁才是合理的。当临界区很大或有共享设备的时候,需要较长时间占用锁,使用自旋锁会降低系统的性能。
- 自旋锁可能导致系统死锁。引发这个问题最常见的情况是递归使用一个自旋锁,即如果一个已经拥有某个自旋锁的 CPU 想第二次获得这个自旋锁,则该 CPU 将死锁。此外,如果进程获得自旋锁之后再阻塞,也有可能导致死锁的发生。copy_from_user()、copy_to_user()和 kmalloc()等函数都有可能引起阻塞,因此在自旋锁的占用期间不能调用这些函数。
读写锁
写锁锁定时,只允许持有写锁的对象操作临界区,不允许其他写锁和读锁锁定。写锁非锁定时,多个持有读锁的对象可以同时操作临界区。
// 静态初始化
DEFINE_RWLOCK(my_rwlock);
// 动态初始化
rwlock_t my_rwlock;
rwlock_init(&my_rwlock);
// 读锁定
void read_lock(rwlock_t *lock);
void read_lock_irqsave(rwlock_t *lock, unsigned long flags);
void read_lock_irq(rwlock_t *lock);
void read_lock_bh(rwlock_t *lock);
// 读解锁
void read_unlock(rwlock_t *lock);
void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void read_unlock_irq(rwlock_t *lock);
void read_unlock_bh(rwlock_t *lock);
// 写锁定
void write_lock(rwlock_t *lock);
void write_lock_irqsave(rwlock_t *lock, unsigned long flags);
void write_lock_irq(rwlock_t *lock);
void write_lock_bh(rwlock_t *lock);
int write_trylock(rwlock_t *lock);
// 写解锁
void write_unlock(rwlock_t *lock);
void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void write_unlock_irq(rwlock_t *lock);
void write_unlock_bh(rwlock_t *lock);
顺序锁
写锁锁定时,只允许当前持有写锁的对象和多个持有读锁的对象操作临界区,不允许多个写锁同时锁定。写锁非锁定时,多个持有读锁的对象可以同时操作临界区。
// 静态初始化
DEFINE_SEQLOCK(my_seqlock);
// 动态初始化
seqlock_t my_seqlock;
seqlock_init(&my_seqlock);
// 获得写锁
void write_seqlock(seqlock_t *sl);
int write_tryseqlock(seqlock_t *sl);
write_seqlock_irqsave(lock, flags); // = loal_irq_save() + write_seqlock()
write_seqlock_irq(lock); // = local_irq_disable() + write_seqlock()
write_seqlock_bh(lock); // = local_bh_disable() + write_seqlock()
// 释放写锁
void write_sequnlock(seqlock_t *sl);
write_sequnlock_irqrestore(lock, flags); // = write_sequnlock() + local_irq_restore()
write_sequnlock_irq(lock); // = write_sequnlock() + local_irq_enable()
write_sequnlock_bh(lock); // = write_sequnlock() + local_bh_enable()
// 获得读锁
unsigned read_seqbegin(const seqlock_t *sl);
read_seqbegin_irqsave(lock, flags); // = local_irq_save() + read_seqbegin()
// 重读判断
int read_seqretry(const seqlock_t *sl, unsigned iv);
read_seqretry_irqrestore(lock, iv, flags); // read_seqretry_irqrestore() = read_seqretry() + local_irq_restore()
// 读执行单元使用顺序锁的模式如下:
do{
seqnum = read_seqbegin(&seqlock_a);
// 读操作代码块
...
}while(read_seqretry(&seqlock_a, seqnum));
Read-Copy Update
读-拷贝-更新,缩写为 RCU。对于被 RCU 保护的共享数据结构,读执行单元不需要获得任何锁就可以访问它,不使用原子指令,因此不会导致锁竞争、内存延迟以及流水线停滞,也不存在死锁问题。
使用 RCU 的写执行单元在访问它前需首先复制一个副本,然后对副本进行修改,最后使用一个回调机制在适当的时机把指向原来数据的指针重新指向新的被修改的数据,这个时机就是所有引用该数据的CPU 都退出对共享数据的操作的时候。读执行单元没有任何同步开销,而写执行单元的同步开销则取决于使用的写执行单元间的同步机制。
RCU 可以看做读写锁的高性能版本,相比读写锁,RCU 的优点在于既允许多个读执行单元同时访问被保护的数据,又允许多个读执行单元和多个写执行单元同时访问被保护的数据。
但是,RCU 不能替代读写锁,因为如果写比较多时,对读执行单元的性能提高不能弥补写执行单元导致的损失。因为使用 RCU 时,写执行单元之间的同步开销会比较大,它需要延迟数据结构的释放,复制被修改的数据结构,它也必须使用某种锁机制同步并行的其他写执行单元的修改操作。
// 读锁定
rcu_read_lock();
rcu_read_lock_bh();
// 读解锁
rcu_read_unlock();
rcu_read_unlock_bh();
同步 RCU
synchronize_rcu();
该函数由 RCU 写执行单元调用,它将阻塞写执行单元,直到所有的读执行单元已经完成读执行单元临界区,写执行单元才可以继续下一步操作。如果有多个 RCU 写执行单元调用该函数,它们将在一个 grace period(即所有的读执行单元已经完成对临界区的访问)之后全部被唤醒。synchronize_rcu() 保证所有 CPU 都处理完正在运行的读执行单元临界区。
挂接回调
void fastcall call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
void fastcall call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
函数 call_rcu() 也由 RCU 写执行单元调用,它不会使写执行单元阻塞,因而可以在中断上下文或软中断中使用。该函数将把函数 func 挂接到 RCU 回调函数链上,然后立即返回。函数 synchronize_rcu() 的实现实际上使用了 call_rcu() 函数。
call_ruc_bh() 函数的功能几乎与 call_rcu() 完全相同,唯一差别就是它把软中断的完成也当做经历一个 quiescent state(静默状态),因此如果写执行单元使用了该函数,在进程上下文的读执行单元必须使用 rcu_read_lock_bh()。
使用 RCU 时,读执行单元必须提供一个信号给写执行单元以便写执行单元能够确定数据可以被安全地释放或修改的时机。有一个专门的垃圾收集器来探测读执行单元的信号,一旦所有的读执行单元都已经发送信号告知它们都不在使用被 RCU 保护的数据结构,垃圾收集器就调用回调函数完成最后的数据释放或修改操作。
信号量
信号量(semaphore)是用于保护临界区的一种常用方法,它的使用方式和自旋锁类似。与自旋锁相同,只有得到信号量的进程才能执行临界区代码。但是,与自旋锁不同的是,当获取不到信号量时,进程不会原地打转而是进入休眠等待状态。
信号量的基本用法
定义和初始化信号量方法如下:
// 定义名称为 sem 的信号量
struct semaphore sem;
// 初始化信号量,并设置信号量 sem 的值为 val。尽管信号量可以被初始化为大于 1 的值从而成为一个计数信号量,但是它通常不被这样使用。
void sema_init(struct semaphore *sem, int val);
// 该函数用于初始化一个用于互斥的信号量,它把信号量 sem 的值设置为 1,等同于sema_init(struct semaphore *sem, 1)
void init_MUTEX(struct semaphore *sem);
// 该函数也用于初始化一个信号量,但它把信号量 sem 的值设置为 0,等同于 sema_init(struct semaphore* sem, 0)
void init_MUTEX_LOCKED(struct semaphore *sem);
// 定义一个名为 name 的信号量并初始化为 1
DECLARE_MUTEX(name)
// 定义一个名为 name 的信号量并初始化为 0
DECLARE_MUTEX_LOCKED(name)
获取信号量方法如下:
// 该函数用于获得信号量 sem,它会导致睡眠,因此不能在中断上下文使用。
void down(struct semaphore * sem);
// 因为 down_interruptible() 而进入睡眠状态的进程可以被信号打断,信号会导致该函数返回,返回值为 -EINTR
// 在使用 down_interruptible() 获取信号量时,对返回值一般会进行检查,如果非 0,通常立即返回 -ERESTARTSYS
int down_interruptible(struct semaphore * sem);
// 该函数尝试获得信号量 sem,如果能够立刻获得,它就获得该信号量并返回 0,否则,返回非 0 值。它不会导致调用者睡眠,可以在中断上下文使用。
int down_trylock(struct semaphore * sem);
使用可被中断的信号量版本的意思是,万一出现了 semaphore 的死锁,还有机会用 Ctrl+C 发出软中断,让等待这个内核驱动返回的用户态进程退出。而不是把整个系统都锁住了。在休眠时,能被中断信号终止,这个进程是可以接受中断信号的!比如你在命令行中输入:
sleep 10000
按下 Ctrl+C,就给上面的进程发送了进程终止信号。信号发送给用户空间,然后通过系统调用,会把这个信号传给递给驱动,进而避免无限期等待下去。
释放信号量方法如下:
void up(struct semaphore * sem);
如果信号量被初始化为 0,则它可以用于同步,同步意味着一个执行单元的继续执行需等待另一执行单元完成某事,保证执行的先后顺序。
完成量(completion)
Linux 系统提供了一种比信号量更好的同步机制,即完成量,它用于一个执行单元等待另一个执行单元执行完某事。
// 定义完成量。
struct completion my_completion;
// 初始化完成量。
init_completion(&my_completion);
// 定义并初始化完成量。
DECLARE_COMPLETION(my_completion);
// 等待一个 completion 被唤醒。
void wait_for_completion(struct completion *c);
// 唤醒一个等待的执行单元。
void complete(struct completion *c);
// 唤醒所有等待的执行单元。
void complete_all(struct completion *c);
自旋锁与信号量的对比
信号量是进程级的,用于多个进程之间对资源的互斥,虽然也是在内核中,但是该内核执行路径是以进程的身份,代表进程来争夺资源的。如果竞争失败,会发生进程上下文切换,当前进程进入睡眠状态,CPU 将运行其他进程。鉴于进程上下文切换的开销也很大,因此,只有当进程占用资源时间较长时,用信号量才是较好的选择。
当所要保护的临界区访问时间比较短时,用自旋锁是非常方便的,因为它节省上下文切换的时间。但是 CPU 得不到自旋锁会在那里空转直到其他执行单元解锁为止,所以要求锁不能在临界区里长时间停留,否则会降低系统的效率。
信号量所保护的临界区可包含可能引起阻塞的代码,而自旋锁则绝对要避免用来保护包含这样代码的临界区。因为阻塞意味着要进行进程的切换,如果进程被切换出去后,另一个进程企图获取本自旋锁,死锁就会发生。
信号量存在于进程上下文,因此,如果被保护的共享资源需要在中断或软中断情况下使用,则在信号量和自旋锁之间只能选择自旋锁。当然,如果一定要使用信号量,则只能通过 down_trylock() 方式进行,不能获取就立即返回以避免阻塞。
读写信号量
读写信号量与信号量的关系与读写自旋锁和自旋锁的关系类似,读写信号量可能引起进程阻塞,但它可允许 N 个读执行单元同时访问共享资源,而最多只能有一个写执行单元。
// 定义读写信号量
struct rw_semaphore my_rws;
// 初始化读写信号量
void init_rwsem(struct rw_semaphore *sem);
// 获取读信号量
void down_read(struct rw_semaphore *sem);
int down_read_trylock(struct rw_semaphore *sem);
// 释放读信号量
void up_read(struct rw_semaphore *sem);
// 获取写信号量
void down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
// 释放写信号量
void up_write(struct rw_semaphore *sem);
互斥体
尽管信号量已经可以实现互斥的功能,而且包含 DECLARE_MUTEX()、init_MUTEX() 等定义信号量的宏或函数,从名字上看就体现出了互斥体的概念,但是 mutex 在 Linux 内核中还是真实地存在的。
// 码定义名为互斥体
struct mutex my_mutex;
// 初始化互斥体
mutex_init(&my_mutex);
// 获取互斥体,不能被信号打断
void fastcall mutex_lock(struct mutex *lock);
// 获取互斥体,可以被信号打断
int fastcall mutex_lock_interruptible(struct mutex *lock);
// 尝试获取互斥体,获取失败立即返回
int fastcall mutex_trylock(struct mutex *lock);
// 释放互斥体
void fastcall mutex_unlock(struct mutex *lock);
并发控制示例
struct demo_dev
{
struct cdev cdev;
struct device *dev;
struct semaphore demo_sem;
// This is a test.
char demo_text[DEMO_DATA_SIZE];
};
static ssize_t demo_read(struct file *filp, char __user *buffer, size_t count, loff_t *position)
{
struct demo_dev *devp = filp->private_data;
loff_t p;
ssize_t ret = 0;
down(&devp->demo_sem);
p = *position;
// This is a test.
// 分析和获取有效的读长度
if(DEMO_DATA_SIZE<=p) { // 要读的偏移位置越界
up(&devp->demo_sem);
return 0; // End of a file
}
if(DEMO_DATA_SIZE<(count+p)) // 要读的字节数太大
count = DEMO_DATA_SIZE-p;
if(copy_to_user((void*)buffer, &devp->demo_text[p], count))
ret = -EFAULT;
else
{
*position += count
ret = count;
}
up(&devp->demo_sem);
return ret;
}
static ssize_t demo_write(struct file *filp, const char __user *buffer, size_t count, loff_t *position)
{
struct demo_dev *devp = filp->private_data;
loff_t p;
ssize_t ret = 0;
down(&devp->demo_sem);
p = *position;
// This is a test.
// 分析和获取有效的写长度
if(DEMO_DATA_SIZE<=p) { // 要读的偏移位置越界
up(&devp->demo_sem);
return 0; // End of a file
}
if(DEMO_DATA_SIZE<(count+p)) // 要读的字节数太大
count = DEMO_DATA_SIZE-p;
if(copy_from_user(&devp->demo_text[p], (void*)buffer, count))
ret = -EFAULT;
else {
*position += count;
ret = count;
}
up(&devp->demo_sem);
return ret;
}
static int demo_setup_cdev(struct demo_dev *devp, int index)
{
char name[16];
int err, devno = MKDEV(demo_major, index);
cdev_init(&devp->cdev, &demo_fops);
devp->cdev.owner = THIS_MODULE;
err = cdev_add(&devp->cdev, devno, 1);
if(err)
{
printk(KERN_ERR "demo add cdev:%d error:%d.\r\n", index, err);
goto out_cdev;
}
// 创建设备节点
memset(name, 0, 16);
sprintf(name, DEMO_MODULE_NAME"%d", index);
printk(KERN_INFO "demo new dev name:%s", name);
devp->dev = device_create(class, NULL, devp->cdev.dev, NULL, name);
// This is a test.
devp->demo_text[DEMO_DATA_SIZE-2] = '\n';
devp->demo_text[DEMO_DATA_SIZE-1] = 0;
sprintf(devp->demo_text, "%d", index);
sema_init(&devp->demo_sem, 1);
return 0;
out_cdev:
cdev_del(&devp->cdev);
kfree(devp);
return err;
}