说一说Kotlin协程中的同步锁——Mutex

随笔8个月前发布 江风海韵
82 0 0

前言

在多线程并发的情况下会很容易出现同步问题,这时候就需要使用各种锁来避免这些问题,在java开发中,最常用的就是使用synchronized。kotlin的协程也会遇到这样的问题,因为在协程线程池中会同时存在多个运行的Worker,每一个Worker都是一个线程,这样也会有并发问题。

虽然kotlin中也可以使用synchronized,但是有很大的问题。因为synchronized当获取不到锁的时候,会阻塞线程,这样这个线程一段时间内就无法处理其他任务,这不符合协程的思想。为此,kotlin提供了一个协程中可以使用的同步锁——Mutex

Mutex

Mutex使用起来也非常简单,只有几个函数lock、unlock、tryLock,一看名字就知道是什么。还有一个holdsLock,就是返回当前锁的状态。

这里要注意,lock和unlock必须成对出现,tryLock返回true的之后也必须在使用完执行unlock。这样使用的时候就比较麻烦,所以kotlin还提供了一个扩展函数withLock,它与synchronized类似,会在代码执行完成或异常的时候自动释放锁,这样就避免了忘记释放锁导致程序出错的情况。

withLock

withLock的代码如下:

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    contract { 
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    }

    lock(owner)
    try {
        return action()
    } finally {
        unlock(owner)
    }
}

代码非常简单,就是先lock一下,然后执行代码,最终在finally中释放锁,这样就保证了锁一定会被释放。

lock

这样一看mutex好像跟synchronized或其他java的锁差不多,那么为什么它是如何解决线程阻塞的问题呢。

这就要从lock和unlock的流程中来看,先来看看lock:

public override suspend fun lock(owner: Any?) {
    // fast-path -- try lock
    if (tryLock(owner)) return
    // slow-path -- suspend
    return lockSuspend(owner)
}

先是通过tryLock来获取锁,如果获取到了就直接返回执行代码。重点来看获取不到是如何处理的,获取不到的时候会执行lockSuspend,它的代码如下:

private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
    var waiter = LockCont(owner, cont)  //1
    _state.loop { state ->
        when (state) {
            is Empty -> {
                if (state.locked !== UNLOCKED) {  //2
                    _state.compareAndSet(state, LockedQueue(state.locked)) //3
                } else {
                    // try lock
                    val update = if (owner == null) EMPTY_LOCKED else Empty(owner)
                    if (_state.compareAndSet(state, update)) { // locked
                        cont.resume(Unit) { unlock(owner) } //4
                        return@sc
                    }
                }
            }
            is LockedQueue -> {
                val curOwner = state.owner
                check(curOwner !== owner) { "Already locked by $owner" }

                state.addLast(waiter)  //5

                if (_state.value === state || !waiter.take()) {  //6
                    // added to waiter list
                    cont.removeOnCancellation(waiter)
                    return@sc
                }

                waiter = LockCont(owner, cont)
                return@loop
            }
            is OpDescriptor -> state.perform(this) // help
            else -> error("Illegal state $state")
        }
    }
}

可以看到这个函数是被suspend修饰的,所以这个是可挂起的函数,当执行到这里的时候线程就被挂起了,如果没有立刻恢复,而且有其他任务,那么线程就可以先执行其他任务,这样就不会阻塞住了。那么是如何恢复的。

函数一开始创建了一个LockCont对象waiter,这个是后面的关键,不过现在还用不到。

Empty

继续看根据不同的状态执行不同的代码,先看看Empty(等待列表为空)状态,再判断一下当前是否加锁(代码2),如果不是非加锁则将状态设置为LockedQueue状态(代码3);如果当前是非加锁,则获取锁,获取到之后执行resume来唤醒线程来执行后续代码(代码4),这种情况基本就是立刻获取到锁,所以不在这里细说了。

上面说了如果等待列表为空并且无法立刻获取锁,就会切换到LockedQueue状态(代码3),所以只要当前无法获取锁,最终都会进行LockedQueue状态,那么来看看这个状态怎么处理的。

LockedQueue

这个状态会就将函数一开始创建的waiter添加到state中(代码5),然后还是再判断一次当前状态,因为这时候可能锁的状态已经改变了,如果没有变则直接就返回了。

注意看到每个状态里,都会反复的校验当前锁的状态。

可以看到在LockedQueue这个流程结束后并没有恢复线程,线程则一直是挂起状态,所以在恢复之前线程是可以处理其他事务的。

那么线程何时恢复?

unlock

来看看unlock代码:

override fun unlock(owner: Any?) {
    _state.loop { state ->
        when (state) {
            is Empty -> {
                ...
            }
            is OpDescriptor -> state.perform(this)
            is LockedQueue -> {
                if (owner != null)
                    check(state.owner === owner) { "Mutex is locked by ${state.owner} but expected $owner" }
                val waiter = state.removeFirstOrNull()  //1
                if (waiter == null) {
                    ...
                } else {
                    if ((waiter as LockWaiter).tryResumeLockWaiter()) { //2
                        state.owner = waiter.owner ?: LOCKED
                        waiter.completeResumeLockWaiter() //3
                        return
                    }
                }
            }
            else -> error("Illegal state $state")
        }
    }
}

上面我们将waiter放入了等待队列中,这时候状态是LockedQueue,所以在unlock函数中我们直接看这个状态的代码。

代码1处从state中取出第一个元素,即waiter。前一个释放锁之后,就会把锁分配给这个waiter。然后在代码2处执行了它的tryResumeLockWaiter函数,如果返回false,还会执行它的completeResumeLockWaiter函数。

LockCont

上面知道waiter是一个LockCont对象,我们来看看它的源码:

private inner class LockCont(
    owner: Any?,
    private val cont: CancellableContinuation<Unit>
) : LockWaiter(owner) {

    override fun tryResumeLockWaiter(): Boolean {
        if (!take()) return false
        return cont.tryResume(Unit, idempotent = null) {
            unlock(owner)
        } != null
    }

    override fun completeResumeLockWaiter() = cont.completeResume(RESUME_TOKEN)
    ...
}

可以看到在tryResumeLockWaiter函数中会执行cont的tryResume来尝试唤醒它对应的线程来执行代码。

如果这个动作没有成功,最后会在completeResumeLockWaiter函数中执行cont的completeResume来唤醒线程。

总结

Mutex的内部逻辑其实并不复杂,如果获取不到锁则会挂起线程并加入到等待队列中,等获取到锁的时候在唤醒线程来执行代码。而这段时间内线程,或者说Worker可以执行其他任务,这样不会阻塞线程,最大的利用了线程的资源,这就很kotlin。

所以大家在处理协程的同步问题的时候,尽量使用Mutex这种Kotlin专门为协程开发的工具,这样才能更好的发挥协程的能力。

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...