前言
在多线程并发的情况下会很容易出现同步问题,这时候就需要使用各种锁来避免这些问题,在java开发中,最常用的就是使用synchronized。kotlin的协程也会遇到这样的问题,因为在协程线程池中会同时存在多个运行的Worker,每一个Worker都是一个线程,这样也会有并发问题。
虽然kotlin中也可以使用synchronized,但是有很大的问题。因为synchronized当获取不到锁的时候,会阻塞线程,这样这个线程一段时间内就无法处理其他任务,这不符合协程的思想。为此,kotlin提供了一个协程中可以使用的同步锁——Mutex
Mutex
Mutex使用起来也非常简单,只有几个函数lock、unlock、tryLock,一看名字就知道是什么。还有一个holdsLock,就是返回当前锁的状态。
这里要注意,lock和unlock必须成对出现,tryLock返回true的之后也必须在使用完执行unlock。这样使用的时候就比较麻烦,所以kotlin还提供了一个扩展函数withLock,它与synchronized类似,会在代码执行完成或异常的时候自动释放锁,这样就避免了忘记释放锁导致程序出错的情况。
withLock
withLock的代码如下:
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
}
lock(owner)
try {
return action()
} finally {
unlock(owner)
}
}
代码非常简单,就是先lock一下,然后执行代码,最终在finally中释放锁,这样就保证了锁一定会被释放。
lock
这样一看mutex好像跟synchronized或其他java的锁差不多,那么为什么它是如何解决线程阻塞的问题呢。
这就要从lock和unlock的流程中来看,先来看看lock:
public override suspend fun lock(owner: Any?) {
// fast-path -- try lock
if (tryLock(owner)) return
// slow-path -- suspend
return lockSuspend(owner)
}
先是通过tryLock来获取锁,如果获取到了就直接返回执行代码。重点来看获取不到是如何处理的,获取不到的时候会执行lockSuspend,它的代码如下:
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
var waiter = LockCont(owner, cont) //1
_state.loop { state ->
when (state) {
is Empty -> {
if (state.locked !== UNLOCKED) { //2
_state.compareAndSet(state, LockedQueue(state.locked)) //3
} else {
// try lock
val update = if (owner == null) EMPTY_LOCKED else Empty(owner)
if (_state.compareAndSet(state, update)) { // locked
cont.resume(Unit) { unlock(owner) } //4
return@sc
}
}
}
is LockedQueue -> {
val curOwner = state.owner
check(curOwner !== owner) { "Already locked by $owner" }
state.addLast(waiter) //5
if (_state.value === state || !waiter.take()) { //6
// added to waiter list
cont.removeOnCancellation(waiter)
return@sc
}
waiter = LockCont(owner, cont)
return@loop
}
is OpDescriptor -> state.perform(this) // help
else -> error("Illegal state $state")
}
}
}
可以看到这个函数是被suspend修饰的,所以这个是可挂起的函数,当执行到这里的时候线程就被挂起了,如果没有立刻恢复,而且有其他任务,那么线程就可以先执行其他任务,这样就不会阻塞住了。那么是如何恢复的。
函数一开始创建了一个LockCont对象waiter,这个是后面的关键,不过现在还用不到。
Empty
继续看根据不同的状态执行不同的代码,先看看Empty(等待列表为空)状态,再判断一下当前是否加锁(代码2),如果不是非加锁则将状态设置为LockedQueue状态(代码3);如果当前是非加锁,则获取锁,获取到之后执行resume来唤醒线程来执行后续代码(代码4),这种情况基本就是立刻获取到锁,所以不在这里细说了。
上面说了如果等待列表为空并且无法立刻获取锁,就会切换到LockedQueue状态(代码3),所以只要当前无法获取锁,最终都会进行LockedQueue状态,那么来看看这个状态怎么处理的。
LockedQueue
这个状态会就将函数一开始创建的waiter添加到state中(代码5),然后还是再判断一次当前状态,因为这时候可能锁的状态已经改变了,如果没有变则直接就返回了。
注意看到每个状态里,都会反复的校验当前锁的状态。
可以看到在LockedQueue这个流程结束后并没有恢复线程,线程则一直是挂起状态,所以在恢复之前线程是可以处理其他事务的。
那么线程何时恢复?
unlock
来看看unlock代码:
override fun unlock(owner: Any?) {
_state.loop { state ->
when (state) {
is Empty -> {
...
}
is OpDescriptor -> state.perform(this)
is LockedQueue -> {
if (owner != null)
check(state.owner === owner) { "Mutex is locked by ${state.owner} but expected $owner" }
val waiter = state.removeFirstOrNull() //1
if (waiter == null) {
...
} else {
if ((waiter as LockWaiter).tryResumeLockWaiter()) { //2
state.owner = waiter.owner ?: LOCKED
waiter.completeResumeLockWaiter() //3
return
}
}
}
else -> error("Illegal state $state")
}
}
}
上面我们将waiter放入了等待队列中,这时候状态是LockedQueue,所以在unlock函数中我们直接看这个状态的代码。
代码1处从state中取出第一个元素,即waiter。前一个释放锁之后,就会把锁分配给这个waiter。然后在代码2处执行了它的tryResumeLockWaiter函数,如果返回false,还会执行它的completeResumeLockWaiter函数。
LockCont
上面知道waiter是一个LockCont对象,我们来看看它的源码:
private inner class LockCont(
owner: Any?,
private val cont: CancellableContinuation<Unit>
) : LockWaiter(owner) {
override fun tryResumeLockWaiter(): Boolean {
if (!take()) return false
return cont.tryResume(Unit, idempotent = null) {
unlock(owner)
} != null
}
override fun completeResumeLockWaiter() = cont.completeResume(RESUME_TOKEN)
...
}
可以看到在tryResumeLockWaiter函数中会执行cont的tryResume来尝试唤醒它对应的线程来执行代码。
如果这个动作没有成功,最后会在completeResumeLockWaiter函数中执行cont的completeResume来唤醒线程。
总结
Mutex的内部逻辑其实并不复杂,如果获取不到锁则会挂起线程并加入到等待队列中,等获取到锁的时候在唤醒线程来执行代码。而这段时间内线程,或者说Worker可以执行其他任务,这样不会阻塞线程,最大的利用了线程的资源,这就很kotlin。
所以大家在处理协程的同步问题的时候,尽量使用Mutex这种Kotlin专门为协程开发的工具,这样才能更好的发挥协程的能力。