读写锁的特点:读和读不互斥,读和写是互斥的,写和写也是互斥的
原理:
const rwmutexMaxReaders = 1 << 30 :获取读锁的协程的数量最多为2的30次方
type RWMutex struct {
w Mutex // 互斥锁,获取写锁的时候,写协程先获取互斥锁,防止其它协程抢夺,每次只允许一个协程获取写锁
写协程获取互斥锁后和读协程按照先后顺序运行
writerSem uint32 // 写协程的信号,一个写协程在上面等待
readerSem uint32 // 读协程的信号,多个读协程在上面等待
readerCount atomic.Int32 // 读的协程的数量,是一个32位的原子数,是一个正数,也可能是一个负数
获取读锁的协程,该协程称为为读协程,加读锁过程:先对readerCount加1,如果+1后结果小于0就说明有一个写协程对其减少了rwmutexMaxReaders,此时,该读协程将进入等待队列进行等待,直到写协程完成,写协程完成后会对其加rwmutexMaxReaders得到一个数r,如果r等于0,说明没有等待的读协程。如果大于0,说明有r个读协程在等待,需要一一唤醒,等唤醒完成后需要解除互斥锁,允许其它协程获取写锁.
读协程解锁的过程:先对readerCount减少一,如果减少一之后小于0,说明有一个写的协程在等待,这个时候需要将readerWait(正在运行的读协程的数量,也就是写协程等待前正在运行的读协程的数量)减少1,当最后一个正读协程解锁后,就会唤醒等待的写协程。
写协程的加锁过程:先获取互斥锁,只能允许一个协程进入,此时该协程称为写协程,写协程首先将readerCount减去rwmutexMaxReaders。获取一个新值z,将z与rwmutexMaxReaders得到数字m。
当m大于0时,说明在该写协程之前有m个读协程正在运行,需要等待这m个读协程运行完。此时该协程需要将readerWait设置为m,等待唤醒。唤醒过程就是读协程的解锁过程
当m等于0时,该写协程获取写锁成功
写协程的解锁过程:readerCount先加上rwmutexMaxReaders,如果返回的值r大于0,说明在写协程之后,有几个读协程正在等待,此时需要将其一一唤醒,然后再释放掉互斥锁
readerWait atomic.Int32 // 写协程需要等待这些读协程全部完成,当最后一个读协程完成之后,就会唤醒等待的写协程
}
读协程获取锁的过程:对readerCount加1,如果小于0,说明写协程在它之前运行,需要等待写协程运行完
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
读协程解锁的过程:
将readerCount减少1,如果结果小于0,说明有一个写协程在等待,这个时候需要等最后一个读协程运行完,将写协程唤醒
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := rw.readerCount.Add(-1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
fatal("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if rw.readerWait.Add(-1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
写协程加锁的过程:先获取互斥锁,然后将readerCount减少rwmutexMaxReaders,再判断在它之前有多少个读协程正在运行,如果没有读协程正在运行,那么它将运行,否则将readerWait设置为需要等待的读协程数量,然后进入等待状态,等最后一个读协程运行完,将其唤醒
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
写协程解锁的过程:先对readerCount加上rwmutexMaxReaders,如果结果大于或者等于rwmutexMaxReaders,就说明解锁了多次,需要panic。假如大于0小于 rwmutexMaxReaders,就说明有读协程在等待,需要一一唤醒,等唤醒后,需要释放互斥锁
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
注意:无论是读锁还是写锁只能释放一次,否则会发生panic。所以为了避免重复释放,最好使用defer进行释放