利用源代码深入分析 Linux 互斥机制
【深入理解Linux内核锁】七、互斥体
尽管信号量已经可以实现互斥的功能,但是“正宗”的mutex
在Linux
内核中还是真实地存在着。尤其是在Linux
内核代码中,更多能看到mutex
的身影。
1、互斥体API
struct mutex my_mutex; // 定义互斥体
mutex_init(&my_mutex); // 初始化互斥体
/* 获取互斥体 */
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_trylock(struct mutex *lock);
void mutex_unlock(struct mutex *lock); // 释放互斥体
2、API实现
2.1 mutex
/*
* Simple, straightforward mutexes with strict semantics:
*
* - only one task can hold the mutex at a time
* - only the owner can unlock the mutex
* - multiple unlocks are not permitted
* - recursive locking is not permitted
* - a mutex object must be initialized via the API
* - a mutex object must not be initialized via memset or copying
* - task may not exit with mutex held
* - memory areas where held locks reside must not be freed
* - held mutexes must not be reinitialized
* - mutexes may not be used in hardware or software interrupt
* contexts such as tasklets and timers
*
* These semantics are fully enforced when DEBUG_MUTEXES is
* enabled. Furthermore, besides enforcing the above rules, the mutex
* debugging code also implements a number of additional features
* that make lock debugging easier and faster:
*
* - uses symbolic names of mutexes, whenever they are printed in debug output
* - point-of-acquire tracking, symbolic lookup of function names
* - list of all locks held in the system, printout of them
* - owner tracking
* - detects self-recursing locks and prints out all relevant info
* - detects multi-task circular deadlocks and prints out all affected
* locks and tasks (and only those tasks)
*/
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
结构体名称:mutex
文件位置:include/linux/mutex.h
主要作用:互斥锁结构体,用于定义一个互斥锁
-
atomic_long_t owner
:原子变量,表示互斥锁当前的持有者,可以安全地被多个线程同时访问,而不会导致数据破坏。 -
spinlock_t wait_lock
:一个自旋锁,用于保护涉及wait_list
的临界区 -
struct list_head wait_list
:一个链表头,用于维护等待互斥锁释放的线程列表。
2.2 mutex_init
/**
* mutex_init - initialize the mutex
* @mutex: the mutex to be initialized
*
* Initialize the mutex to unlocked state.
*
* It is not allowed to initialize an already locked mutex.
*/
#define mutex_init(mutex) \
do { \
static struct lock_class_key __key; \
\
__mutex_init((mutex), #mutex, &__key); \
} while (0)
void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_long_set(&lock->owner, 0);
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);
#endif
debug_mutex_init(lock, name, key);
}
EXPORT_SYMBOL(__mutex_init);
结构体名称:mutex_init
文件位置:include/linux/mutex.h
主要作用:初始化互斥体
相关实现:
-
mutex_init
调用__mutex_init
接口,实现互斥锁的初始化 -
atomic_long_set
:初始化原子变量,设置值为0 -
spin_lock_init
:初始化自旋锁,设置其为UNLOCK
-
INIT_LIST_HEAD
:初始化链表 -
debug_mutex_init
:初始化调试相关信息
2.3 mutex_lock
/**
* mutex_lock - acquire the mutex
* @lock: the mutex to be acquired
*
* Lock the mutex exclusively for this task. If the mutex is not
* available right now, it will sleep until it can get it.
*
* The mutex must later on be released by the same task that
* acquired it. Recursive locking is not allowed. The task
* may not exit without first unlocking the mutex. Also, kernel
* memory where the mutex resides must not be freed with
* the mutex still locked. The mutex must first be initialized
* (or statically defined) before it can be locked. memset()-ing
* the mutex to 0 is not allowed.
*
* (The CONFIG_DEBUG_MUTEXES .config option turns on debugging
* checks that will enforce the restrictions and will also do
* deadlock debugging)
*
* This function is similar to (but not equivalent to) down().
*/
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
if (!__mutex_trylock_fast(lock))
__mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);
#endif
# define might_sleep() do { might_resched(); } while (0)
/*
* Optimistic trylock that only works in the uncontended case. Make sure to
* follow with a __mutex_trylock() before failing.
*/
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
unsigned long curr = (unsigned long)current;
unsigned long zero = 0UL;
if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
return true;
return false;
}
static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip)
{
return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}
/*
* Lock a mutex (possibly interruptible), slowpath:
*/
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
bool first = false;
struct ww_mutex *ww;
int ret;
might_sleep();
ww = container_of(lock, struct ww_mutex, base);
if (use_ww_ctx && ww_ctx) {
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
/*
* Reset the wounded flag after a kill. No other process can
* race and wound us here since they can't have a valid owner
* pointer if we don't have any locks held.
*/
if (ww_ctx->acquired == 0)
ww_ctx->wounded = 0;
}
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (use_ww_ctx && ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);
goto skip_wait;
}
debug_mutex_lock_common(lock, &waiter);
lock_contended(&lock->dep_map, ip);
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
#ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
} else {
/*
* Add in stamp order, waking up waiters that must kill
* themselves.
*/
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
if (ret)
goto err_early_kill;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
set_current_state(state);
for (;;) {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock))
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (unlikely(signal_pending_state(state, current))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret)
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if ((use_ww_ctx && ww_ctx) || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);
if (use_ww_ctx && ww_ctx) {
/*
* Wound-Wait; we stole the lock (!first_waiter), check the
* waiters as anyone might want to wound us.
*/
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err:
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
结构体名称:mutex_lock
文件位置:kernel/locking/mutex.c
主要作用:用于获取一个互斥锁
调用流程:
mutex_lock(kernel/locking/mutex.c)
|--> might_sleep
|--> __mutex_trylock_fast
|--> atomic_long_try_cmpxchg_acquire // 尝试获取互斥锁
|--> __mutex_lock_slowpath
|--> __mutex_lock
|--> __mutex_lock_common // 获取锁并等待
相关实现:
might_sleep
:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。
__mutex_trylock_fast
:尝试获取互斥锁(mutex),如果获取成功则范围true
,获取失败,返回false
获取失败后,调用__mutex_lock_slowpath
接口,最终调用__mutex_lock_common
接口,该接口才是重头戏
__mutex_lock_common
接口主要有几个作用:死锁避免策略、
下面我们看第一部分:死锁避免策略
struct ww_mutex *ww;
int ret;
might_sleep();
ww = container_of(lock, struct ww_mutex, base);
if (use_ww_ctx && ww_ctx) {
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
/*
* Reset the wounded flag after a kill. No other process can
* race and wound us here since they can't have a valid owner
* pointer if we don't have any locks held.
*/
if (ww_ctx->acquired == 0)
ww_ctx->wounded = 0;
}
- 这段代码片段是针对带有死锁避免机制
(Wound-Wait,WW)
的互斥锁的操作,是一种多线程环境中避免死锁的策略。让我们逐行分析这段代码的作用 -
struct ww_mutex *ww
:声明了一个指向struct ww_mutex
类型的指针ww
,这是一种带有死锁避免机制的互斥锁的数据结构。 -
might_sleep
:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。 -
ww = container_of(lock, struct ww_mutex, base)
:使用container_of
宏将传递进来的lock
指针转换为struct ww_mutex
结构体的指针。这意味着lock
指向的是struct ww_mutex
结构体中的base
成员。这样可以将更高级别的数据结构与较低级别的数据结构关联起来。 -
if (use_ww_ctx && ww_ctx)
:如果use_ww_ctx
为真且ww_ctx
不为空(即使用死锁避免上下文并且上下文存在)。 -
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
:检查是否已经在同一个上下文中尝试获取锁。如果是,则返回错误码-EALREADY
,表示已经在当前上下文中获取了锁。(避免死锁) -
if (ww_ctx->acquired == 0)
:检查是否当前上下文已经没有持有任何锁。如果是,则将ww_ctx
的wounded
标志重置为0。
下面我们看第二部分:获取锁阶段
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
-
preempt_disable
:禁用抢占,确保在获取锁期间不会被抢占。 - 通过调用
__mutex_trylock
和mutex_optimistic_spin
多次尝试获取锁,如果获取锁成功,说明没有其他进程占用,直接返回,否则接着往下执行 -
ww_mutex_set_context_fastpath(ww, ww_ctx)
:如果使用了"wait-wound"上下文,这个函数会将上下文设置为快速路径,以便在解锁时进行优化。 -
preempt_enable()
:重新启用抢占,允许系统在后续操作中进行抢占。
下面我们看第三部分:将等待进程置入睡眠状态
spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (use_ww_ctx && ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);
goto skip_wait;
}
debug_mutex_lock_common(lock, &waiter);
lock_contended(&lock->dep_map, ip);
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
#ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
} else {
/*
* Add in stamp order, waking up waiters that must kill
* themselves.
*/
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
if (ret)
goto err_early_kill;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
set_current_state(state);
- 如果不使用"wait-wound"上下文,会将等待任务添加到等待队列的末尾(FIFO顺序)。
- 如果使用"wait-wound"上下文,会按照时间戳的顺序将等待任务添加到等待队列,并可能唤醒需要终止的等待者。
-
spin_lock(&lock->wait_lock)
:获取wait_lock
自旋锁,这是为了在等待期间操作等待队列时防止竞争。 -
if (__mutex_trylock(lock))
:再次尝试获取锁,如果成功获取,说明此时没有竞争,会跳到acquired
标签。 - 根据是否使用"wait-wound"上下文进行不同的等待队列操作:
-
set_current_state(state)
:设置等待任务的上下文和当前状态
下面我们看第四部分:等待状态处理
for (;;) {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock))
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (unlikely(signal_pending_state(state, current))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret)
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if ((use_ww_ctx && ww_ctx) || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);
if (use_ww_ctx && ww_ctx) {
/*
* Wound-Wait; we stole the lock (!first_waiter), check the
* waiters as anyone might want to wound us.
*/
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err:
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
这部分代码是在等待期间的处理:
如果获取到锁,跳转到
acquired
标签。
-
__set_current_state(TASK_RUNNING)
:设置当前线程的状态为TASK_RUNNING
。 - 如果使用"wait-wound"上下文,检查是否需要继续检查等待者。
-
mutex_remove_waiter
:从等待队列中移除等待任务。 -
list_empty
:判断是否等待队列为空,如果是,则清除锁的标志。 - 实时调用
__mutex_trylock
判断是否获取到锁,如果成功获取,跳转到acquired
标签。 -
signal_pending_state
:检查是否有信号到达,如果有,返回错误码-EINTR
。 - 如果使用"wait-wound"上下文,检查是否需要终止等待者。
-
spin_unlock(&lock->wait_lock)
:解锁wait_lock
,禁用抢占并进入调度等待状态。 -
schedule_preempt_disabled
:进入调度等待状态 -
set_current_state
:设置当前线程的状态为等待状态。 -
__mutex_trylock
和mutex_optimistic_spin
:尝试获取锁,或者如果是第一个等待任务且可以进行自旋优化,则尝试自旋等待。 -
spin_lock(&lock->wait_lock)
:获取锁,继续自旋等待
这段代码是内核中用于互斥锁获取的一个复杂实现,其中包含了自旋、等待队列、信号处理和死锁避免等多个关键概念和操作,它旨在在高并发的多线程环境中提供高性能的互斥锁实现。
2.4 mutex_unlock
/**
* mutex_unlock - release the mutex
* @lock: the mutex to be released
*
* Unlock a mutex that has been locked by this task previously.
*
* This function must not be used in interrupt context. Unlocking
* of a not locked mutex is not allowed.
*
* This function is similar to (but not equivalent to) up().
*/
void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_LOCK_ALLOC
if (__mutex_unlock_fast(lock))
return;
#endif
__mutex_unlock_slowpath(lock, _RET_IP_);
}
EXPORT_SYMBOL(mutex_unlock);
/*
* Release the lock, slowpath:
*/
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
struct task_struct *next = NULL;
DEFINE_WAKE_Q(wake_q);
unsigned long owner;
mutex_release(&lock->dep_map, 1, ip);
/*
* Release the lock before (potentially) taking the spinlock such that
* other contenders can get on with things ASAP.
*
* Except when HANDOFF, in that case we must not clear the owner field,
* but instead set it to the top waiter.
*/
owner = atomic_long_read(&lock->owner);
for (;;) {
unsigned long old;
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif
if (owner & MUTEX_FLAG_HANDOFF)
break;
old = atomic_long_cmpxchg_release(&lock->owner, owner,
__owner_flags(owner));
if (old == owner) {
if (owner & MUTEX_FLAG_WAITERS)
break;
return;
}
owner = old;
}
spin_lock(&lock->wait_lock);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, next);
}
if (owner & MUTEX_FLAG_HANDOFF)
__mutex_handoff(lock, next);
spin_unlock(&lock->wait_lock);
wake_up_q(&wake_q);
}
结构体名称:mutex_unlock
文件位置:kernel/locking/mutex.c
主要作用:用于释放一个互斥锁,并唤醒等待这个锁的任务
调用流程:
mutex_unlock(kernel/locking/mutex.c)
|--> __mutex_unlock_slowpath
|--> DEFINE_WAKE_Q // 定义唤醒队列
|--> atomic_long_read // 获取原子变量,即获取当前锁的持有者
|--> atomic_long_cmpxchg_release // 判断ock->owner和owner是否相等,如果有,则跳出循环,没有则直接返回
|--> spin_lock // 处理等待队列
|--> list_empty // 判断等待队列是否为空
|--> list_first_entry // 获取等待队列的第一个任务
|--> wake_q_add // 加入到唤醒队列
|--> spin_unlock
实现流程:
mutex_unlock
主要有两个步骤,其一是释放互斥锁,其二是唤醒等待队列的任务,核心实现在__mutex_unlock_slowpath
__mutex_unlock_slowpath
中主要负责实现这两个工作,第一个释放锁部分如下:
/*
* Release the lock before (potentially) taking the spinlock such that
* other contenders can get on with things ASAP.
*
* Except when HANDOFF, in that case we must not clear the owner field,
* but instead set it to the top waiter.
*/
owner = atomic_long_read(&lock->owner);
for (;;) {
unsigned long old;
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif
if (owner & MUTEX_FLAG_HANDOFF)
break;
old = atomic_long_cmpxchg_release(&lock->owner, owner,
__owner_flags(owner));
if (old == owner) {
if (owner & MUTEX_FLAG_WAITERS)
break;
return;
}
owner = old;
}
- 调用
atomic_long_read
来获取当前锁的拥有者 - 然后调用
atomic_long_cmpxchg_release
来比较当前锁的拥有者是否与之前获取的相同,以此来判断是否有其他线程操作了锁 -
old == owner
:如果相同,则表示没有其他线程获取锁;如果有,则将owner = old
旧的锁的持有者赋值给当前继续判断。 - 进而判断是否有等待者
if (owner & MUTEX_FLAG_WAITERS)
,如果有,就退出循环,执行下面的唤醒等待者的任务,如果没有就直接return
__mutex_unlock_slowpath
中第二个唤醒等待线程部分如下:
spin_lock(&lock->wait_lock);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, next);
}
if (owner & MUTEX_FLAG_HANDOFF)
__mutex_handoff(lock, next);
spin_unlock(&lock->wait_lock);
wake_up_q(&wake_q);
- 开始调用
spin_lock
和spin_unlock
来避免线程的并发,保护临界资源 -
if (!list_empty(&lock->wait_list))
:判断等待队列是否为空 -
list_first_entry
:获取等待队列的第一个任务 -
wake_q_add
:将该任务加入到唤醒队列中 -
if (owner & MUTEX_FLAG_HANDOFF)
:判断当前锁是否被标记为HANDOFF
。如果是,它会调用__mutex_handoff
函数,将锁的所有权交给等待者。 -
wake_up_q
:唤醒等待队列
互斥锁的操作确实比较复杂,其底层牵涉到了原子操作,自旋锁,唤醒队列等操作
3、自旋锁与互斥体区别
实现上的区别:
- 自旋锁:当锁不能获取到时,一直在原地自旋等待,直到锁变为可用,自旋锁等待期间,不会让线程处于睡眠状态,而是一直忙等。
- 互斥锁:当锁不能获取到时,将该线程直接置入睡眠状态,直到互斥体可用被唤醒。
时间开销:
- 自旋锁:在等待锁的过程中,自旋锁所浪费的时间为等待获取锁的时间+执行临界区的时间
- 互斥锁:其时间开销为等待锁的时间+进程上下文切换的时间+执行临界区的时间
总结:如果临界区很小,锁的持有时间非常短,那么使用自旋锁可能更为高效。反之,如果临界区大,锁的持有时间较长,或者涉及到I/O操作等可能导致线程睡眠的操作,那么使用互斥体可能更为合适。
嵌入式艺术
下一篇: 透彻理解这三个核心是学习并发编程的关键
推荐阅读
-
利用源代码深入分析 Linux 互斥机制
-
Linux 中的同步和互斥机制
-
windows下进程间通信的(13种方法)-摘 要 本文讨论了进程间通信与应用程序间通信的含义及相应的实现技术,并对这些技术的原理、特性等进行了深入的分析和比较。 ---- 关键词 信号 管道 消息队列 共享存储段 信号灯 远程过程调用 Socket套接字 MQSeries 1 引言 ---- 进程间通信的主要目的是实现同一计算机系统内部的相互协作的进程之间的数据共享与信息交换,由于这些进程处于同一软件和硬件环境下,利用操作系统提供的的编程接口,用户可以方便地在程序中实现这种通信;应用程序间通信的主要目的是实现不同计算机系统中的相互协作的应用程序之间的数据共享与信息交换,由于应用程序分别运行在不同计算机系统中,它们之间要通过网络之间的协议才能实现数据共享与信息交换。进程间通信和应用程序间通信及相应的实现技术有许多相同之处,也各有自己的特色。即使是同一类型的通信也有多种的实现方法,以适应不同情况的需要。 ---- 为了充分认识和掌握这两种通信及相应的实现技术,本文将就以下几个方面对这两种通信进行深入的讨论:问题的由来、解决问题的策略和方法、每种方法的工作原理和实现、每种实现方法的特点和适用的范围等。 2 进程间的通信及其实现技术 ---- 用户提交给计算机的任务最终都是通过一个个的进程来完成的。在一组并发进程中的任何两个进程之间,如果都不存在公共变量,则称该组进程为不相交的。在不相交的进程组中,每个进程都独立于其它进程,它的运行环境与顺序程序一样,而且它的运行环境也不为别的进程所改变。运行的结果是确定的,不会发生与时间相关的错误。 ---- 但是,在实际中,并发进程的各个进程之间并不是完全互相独立的,它们之间往往存在着相互制约的关系。进程之间的相互制约关系表现为两种方式: ---- (1) 间接相互制约:共享CPU ---- (2) 直接相互制约:竞争和协作 ---- 竞争——进程对共享资源的竞争。为保证进程互斥地访问共享资源,各进程必须互斥地进入各自的临界段。 ---- 协作——进程之间交换数据。为完成一个共同任务而同时运行的一组进程称为同组进程,它们之间必须交换数据,以达到协作完成任务的目的,交换数据可以通知对方可以做某事或者委托对方做某事。 ---- 共享CPU问题由操作系统的进程调度来实现,进程间的竞争和协作由进程间的通信来完成。进程间的通信一般由操作系统提供编程接口,由程序员在程序中实现。UNIX在这个方面可以说最具特色,它提供了一整套进程间的数据共享与信息交换的处理方法——进程通信机制(IPC)。因此,我们就以UNIX为例来分析进程间通信的各种实现技术。 ---- 在UNIX中,文件(File)、信号(Signal)、无名管道(Unnamed Pipes)、有名管道(FIFOs)是传统IPC功能;新的IPC功能包括消息队列(Message queues)、共享存储段(Shared memory segment)和信号灯(Semapores)。 ---- (1) 信号 ---- 信号机制是UNIX为进程中断处理而设置的。它只是一组预定义的值,因此不能用于信息交换,仅用于进程中断控制。例如在发生浮点错、非法内存访问、执行无效指令、某些按键(如ctrl-c、del等)等都会产生一个信号,操作系统就会调用有关的系统调用或用户定义的处理过程来处理。 ---- 信号处理的系统调用是signal,调用形式是: ---- signal(signalno,action) ---- 其中,signalno是规定信号编号的值,action指明当特定的信号发生时所执行的动作。 ---- (2) 无名管道和有名管道 ---- 无名管道实际上是内存中的一个临时存储区,它由系统安全控制,并且独立于创建它的进程的内存区。管道对数据采用先进先出方式管理,并严格按顺序操作,例如不能对管道进行搜索,管道中的信息只能读一次。 ---- 无名管道只能用于两个相互协作的进程之间的通信,并且访问无名管道的进程必须有共同的祖先。 ---- 系统提供了许多标准管道库函数,如: pipe——打开一个可以读写的管道; close——关闭相应的管道; read——从管道中读取字符; write——向管道中写入字符; ---- 有名管道的操作和无名管道类似,不同的地方在于使用有名管道的进程不需要具有共同的祖先,其它进程,只要知道该管道的名字,就可以访问它。管道非常适合进程之间快速交换信息。 ---- (3) 消息队列(MQ) ---- 消息队列是内存中独立于生成它的进程的一段存储区,一旦创建消息队列,任何进程,只要具有正确的的访问权限,都可以访问消息队列,消息队列非常适合于在进程间交换短信息。 ---- 消息队列的每条消息由类型编号来分类,这样接收进程可以选择读取特定的消息类型——这一点与管道不同。消息队列在创建后将一直存在,直到使用msgctl系统调用或iqcrm -q命令删除它为止。 ---- 系统提供了许多有关创建、使用和管理消息队列的系统调用,如: ---- int msgget(key,flag)——创建一个具有flag权限的MQ及其相应的结构,并返回一个唯一的正整数msqid(MQ的标识符); ---- int msgsnd(msqid,msgp,msgsz,msgtyp,flag)——向队列中发送信息; ---- int msgrcv(msqid,cmd,buf)——从队列中接收信息; ---- int msgctl(msqid,cmd,buf)——对MQ的控制操作; ---- (4) 共享存储段(SM) ---- 共享存储段是主存的一部分,它由一个或多个独立的进程共享。各进程的数据段与共享存储段相关联,对每个进程来说,共享存储段有不同的虚拟地址。系统提供的有关SM的系统调用有: ---- int shmget(key,size,flag)——创建大小为size的SM段,其相应的数据结构名为key,并返回共享内存区的标识符shmid; ---- char shmat(shmid,address,flag)——将当前进程数据段的地址赋给shmget所返回的名为shmid的SM段; ---- int shmdr(address)——从进程地址空间删除SM段; ---- int shmctl (shmid,cmd,buf)——对SM的控制操作; ---- SM的大小只受主存限制,SM段的访问及进程间的信息交换可以通过同步读写来完成。同步通常由信号灯来实现。SM非常适合进程之间大量数据的共享。 ---- (5) 信号灯 ---- 在UNIX中,信号灯是一组进程共享的数据结构,当几个进程竞争同一资源时(文件、共享内存或消息队列等),它们的操作便由信号灯来同步,以防止互相干扰。 ---- 信号灯保证了某一时刻只有一个进程访问某一临界资源,所有请求该资源的其它进程都将被挂起,一旦该资源得到释放,系统才允许其它进程访问该资源。信号灯通常配对使用,以便实现资源的加锁和解锁。 ---- 进程间通信的实现技术的特点是:操作系统提供实现机制和编程接口,由用户在程序中实现,保证进程间可以进行快速的信息交换和大量数据的共享。但是,上述方式主要适合在同一台计算机系统内部的进程之间的通信。 3 应用程序间的通信及其实现技术 ---- 同进程之间的相互制约一样,不同的应用程序之间也存在竞争和协作的关系。UNIX操作系统也提供一些可用于应用程序之间实现数据共享与信息交换的编程接口,程序员可以通过自己编程来实现。如远程过程调用和基于TCP/IP协议的套接字(Socket)编程。但是,相对普通程序员来说,它们涉及的技术比较深,编程也比较复杂,实现起来困难较大。 ---- 于是,一种新的技术应运而生——通过将有关通信的细节完全掩盖在某个独立软件内部,即底层的通讯工作和相应的维护管理工作由该软件内部来实现,用户只需要将通信任务提交给该软件去完成,而不必理会它的具体工作过程——这就是所谓的中间件技术。 ---- 我们在这里分别讨论这三种常用的应用程序间通信的实现技术——远程过程调用、会话编程技术和MQSeries消息队列技术。其中远程过程调用和会话编程属于比较低级的方式,程序员参与的程度较深,而MQSeries消息队列则属于比较高级的方式,即中间件方式,程序员参与的程度较浅。 ---- 4.1 远程过程调用(RPC)
-
深入分析Unreal Engine 4源代码中的垃圾回收机制