时间:2021-02-05 09:39:17 | 栏目:Golang | 点击:次
在本节,我们对Go语言所提供的与锁有关的API进行说明。这包括了互斥锁和读写锁。我们在第6章描述过互斥锁,但却没有提到过读写锁。这两种锁对于传统的并发程序来说都是非常常用和重要的。
一、互斥锁
互斥锁是传统的并发程序对共享资源进行访问控制的主要手段。它由标准库代码包sync中的Mutex结构体类型代表。sync.Mutex类型(确切地说,是*sync.Mutex类型)只有两个公开方法――Lock和Unlock。顾名思义,前者被用于锁定当前的互斥量,而后者则被用来对当前的互斥量进行解锁。
类型sync.Mutex的零值表示了未被锁定的互斥量。也就是说,它是一个开箱即用的工具。我们只需对它进行简单声明就可以正常使用了,就像这样:
mutex.Lock()
在我们使用其他编程语言(比如C或Java)的锁类工具的时候,可能会犯的一个低级错误就是忘记及时解开已被锁住的锁,从而导致诸如流程执行异常、线程执行停滞甚至程序死锁等等一系列问题的发生。然而,在Go语言中,这个低级错误的发生几率极低。其主要原因是有defer语句的存在。
我们一般会在锁定互斥锁之后紧接着就用defer语句来保证该互斥锁的及时解锁。请看下面这个函数:
func write() {
mutex.Lock()
defer mutex.Unlock()
// 省略若干条语句
}
函数write中的这条defer语句保证了在该函数被执行结束之前互斥锁mutex一定会被解锁。这省去了我们在所有return语句之前以及异常发生之时重复的附加解锁操作的工作。在函数的内部执行流程相对复杂的情况下,这个工作量是不容忽视的,并且极易出现遗漏和导致错误。所以,这里的defer语句总是必要的。在Go语言中,这是很重要的一个惯用法。我们应该养成这种良好的习惯。
对于同一个互斥锁的锁定操作和解锁操作总是应该成对的出现。如果我们锁定了一个已被锁定的互斥锁,那么进行重复锁定操作的Goroutine将会被阻塞,直到该互斥锁回到解锁状态。请看下面的示例:
var mutex sync.Mutex
fmt.Println("Lock the lock. (G0)")
mutex.Lock()
fmt.Println("The lock is locked. (G0)")
for i := 1; i <= 3; i++ {
go func(i int) {
fmt.Printf("Lock the lock. (G%d)\n", i)
mutex.Lock()
fmt.Printf("The lock is locked. (G%d)\n", i)
}(i)
}
time.Sleep(time.Second)
fmt.Println("Unlock the lock. (G0)")
mutex.Unlock()
fmt.Println("The lock is unlocked. (G0)")
time.Sleep(time.Second)
}
我们把执行repeatedlyLock函数的Goroutine称为G0。而在repeatedlyLock函数中,我们又启用了3个Goroutine,并分别把它们命名为G1、G2和G3。可以看到,我们在启用这3个Goroutine之前就已经对互斥锁mutex进行了锁定,并且在这3个Goroutine将要执行的go函数的开始处也加入了对mutex的锁定操作。这样做的意义是模拟并发地对同一个互斥锁进行锁定的情形。当for语句被执行完毕之后,我们先让G0小睡1秒钟,以使运行时系统有充足的时间开始运行G1、G2和G3。在这之后,解锁mutex。为了能够让读者更加清晰地了解到repeatedlyLock函数被执行的情况,我们在这些锁定和解锁操作的前后加入了若干条打印语句,并在打印内容中添加了我们为这几个Goroutine起的名字。也由于这个原因,我们在repeatedlyLock函数的最后再次编写了一条“睡眠”语句,以此为可能出现的其他打印内容再等待一小会儿。
经过短暂的执行,标准输出上会出现如下内容:
The lock is locked. (G0)
Lock the lock. (G1)
Lock the lock. (G2)
Lock the lock. (G3)
Unlock the lock. (G0)
The lock is unlocked. (G0)
The lock is locked. (G1)
从这八行打印内容中,我们可以清楚的看出上述四个Goroutine的执行情况。首先,在repeatedlyLock函数被执行伊始,对互斥锁的第一次锁定操作便被进行并顺利地完成。这由第一行和第二行打印内容可以看出。而后,在repeatedlyLock函数中被启用的那三个Goroutine在G0的第一次“睡眠”期间开始被运行。当相应的go函数中的对互斥锁的锁定操作被进行的时候,它们都被阻塞住了。原因是该互斥锁已处于锁定状态了。这就是我们在这里只看到了三个连续的Lock the lock. (G<i>)而没有立即看到The lock is locked. (G<i>)的原因。随后,G0“睡醒”并解锁互斥锁。这使得正在被阻塞的G1、G2和G3都会有机会重新锁定该互斥锁。但是,只有一个Goroutine会成功。成功完成锁定操作的某一个Goroutine会继续执行在该操作之后的语句。而其他Goroutine将继续被阻塞,直到有新的机会到来。这也就是上述打印内容中的最后三行所表达的含义。显然,G1抢到了这次机会并成功锁定了那个互斥锁。
实际上,我们之所以能够通过使用互斥锁对共享资源的唯一性访问进行控制正是因为它的这一特性。这有效的对竞态条件进行了消除。
互斥锁的锁定操作的逆操作并不会引起任何Goroutine的阻塞。但是,它的进行有可能引发运行时恐慌。更确切的讲,当我们对一个已处于解锁状态的互斥锁进行解锁操作的时候,就会已发一个运行时恐慌。这种情况很可能会出现在相对复杂的流程之中――我们可能会在某个或多个分支中重复的加入针对同一个互斥锁的解锁操作。避免这种情况发生的最简单、有效的方式依然是使用defer语句。这样更容易保证解锁操作的唯一性。
虽然互斥锁可以被直接的在多个Goroutine之间共享,但是我们还是强烈建议把对同一个互斥锁的成对的锁定和解锁操作放在同一个层次的代码块中。例如,在同一个函数或方法中对某个互斥锁的进行锁定和解锁。又例如,把互斥锁作为某一个结构体类型中的字段,以便在该类型的多个方法中使用它。此外,我们还应该使代表互斥锁的变量的访问权限尽量的低。这样才能尽量避免它在不相关的流程中被误用,从而导致程序不正确的行为。
互斥锁是我们见到过的众多同步工具中最简单的一个。只要遵循前面提及的几个小技巧,我们就可以以正确、高效的方式使用互斥锁,并用它来确保对共享资源的访问的唯一性。下面我们来看看稍微复杂一些的锁实现――读写锁。
二、读写锁
读写锁即是针对于读写操作的互斥锁。它与普通的互斥锁最大的不同就是,它可以分别针对读操作和写操作进行锁定和解锁操作。读写锁遵循的访问控制规则与互斥锁有所不同。在读写锁管辖的范围内,它允许任意个读操作的同时进行。但是,在同一时刻,它只允许有一个写操作在进行。并且,在某一个写操作被进行的过程中,读操作的进行也是不被允许的。也就是说,读写锁控制下的多个写操作之间都是互斥的,并且写操作与读操作之间也都是互斥的。但是,多个读操作之间却不存在互斥关系。
这样的规则对于针对同一块数据的并发读写来讲是非常贴切的。因为,无论读操作的并发量有多少,这些操作都不会对数据本身造成变更。而写操作不但会对同时进行的其他写操作进行干扰,还有可能造成同时进行的读操作的结果的不正确。例如,在32位的操作系统中,针对int64类型值的读操作和写操作都不可能只由一个CPU指令完成。在一个写操作被进行的过程当中,针对同一个只的读操作可能会读取到未被修改完成的值。该值既不与旧的值相等,也不等于新的值。这种错误往往不易被发现,且很难被修正。因此,在这样的场景下,读写锁可以在大大降低因使用锁而对程序性能造成的损耗的情况下完成对共享资源的访问控制。
在Go语言中,读写锁由结构体类型sync.RWMutex代表。与互斥锁类似,sync.RWMutex类型的零值就已经是立即可用的读写锁了。在此类型的方法集合中包含了两对方法,即:
func (*RWMutex) Unlock
和
func (*RWMutex) RUnlock
前一对方法的名称和签名与互斥锁的那两个方法完全一致。它们分别代表了对写操作的锁定和解锁。以下简称它们为写锁定和写解锁。而后一对方法则分别表示了对读操作的锁定和解锁。以下简称它们为读锁定和读解锁。
对已被写锁定的读写锁进行写锁定,会造成当前Goroutine的阻塞,直到该读写锁被写解锁。当然,如果有多个Goroutine因此而被阻塞,那么当对应的写解锁被进行之时只会使其中一个Goroutine的运行被恢复。类似的,对一个已被写锁定的读写锁进行读锁定,也会阻塞相应的Goroutine。但不同的是,一旦该读写锁被写解锁,那么所有因欲进行读锁定而被阻塞的Goroutine的运行都会被恢复。另一方面,如果在进行过程中发现当前的读写锁已被读锁定,那么这个写锁定操作将会等待直至所有施加于该读写锁之上的读锁定都被清除。同样的,在有多个写锁定操作为此而等待的情况下,相应的读锁定的全部清除只能让其中的某一个写锁定操作获得进行的机会。
现在来关注写解锁和读解锁。如果对一个未被写锁定的读写锁进行写解锁,那么会引发一个运行时恐慌。类似的,当对一个未被读锁定的读写锁进行读解锁的时候也会引发一个运行时恐慌。写解锁在进行的同时会试图唤醒所有因进行读锁定而被阻塞的Goroutine。而读解锁在进行的时候则会试图唤醒一个因进行写锁定而被阻塞的Goroutine。
无论锁定针对的是写操作还是读操作,我们都应该尽量及时的对相应的锁进行解锁。对于写解锁,我们自不必多说。而读解锁的及时进行往往更容易被我们忽视。虽说读解锁的进行并不会对其他正在进行中的读操作产生任何影响,但它却与相应的写锁定的进行关系紧密。注意,对于同一个读写锁来说,施加在它之上的读锁定可以有多个。因此,只有我们对互斥锁进行相同数量的读解锁,才能够让某一个相应的写锁定获得进行的机会。否则,后者会继续使进行它的Goroutine处于阻塞状态。由于sync.RWMutex和*sync.RWMutex类型都没有相应的方法让我们获得已进行的读锁定的数量,所以这里是很容易出现问题的。还好我们可以使用defer语句来尽量避免此类问题的发生。请记住,针对同一个读写锁的写锁定和读锁定是互斥的。无论是写解锁还是读解锁,操作的不及时都会对使用该读写锁的流程的正常执行产生负面影响。
除了我们在前面详细讲解的那两对方法之外,*sync.RWMutex类型还拥有另外一个方法――RLocker。这个RLocker方法会返回一个实现了sync.Locker接口的值。sync.Locker接口类型包含了两个方法,即:Lock和Unlock。细心的读者可能会发现,*sync.Mutex类型和*sync.RWMutex类型都是该接口类型的实现类型。实际上,我们在调用*sync.RWMutex类型值的RLocker方法之后所得到的结果值就是这个值本身。只不过,这个结果值的Lock方法和Unlock方法分别对应了针对该读写锁的读锁定操作和读解锁操作。换句话说,我们在对一个读写锁的RLocker方法的结果值的Lock方法或Unlock方法进行调用的时候实际上是在调用该读写锁的RLock方法或RUnlock方法。这样的操作适配在实现上并不困难。我们自己也可以很容易的编写出这些方法的实现。通过读写锁的RLocker方法获得这样一个结果值的实际意义在于,我们可以在之后以相同的方式对该读写锁中的“写锁”和“读锁”进行操作。这为相关操作的灵活适配和替换提供了方便。
三、锁的完整示例
我们下面来看一个与上述锁实现有关的示例。在Go语言的标准库代码包os中有一个名为File的结构体类型。os.File类型的值可以被用来代表文件系统中的某一个文件或目录。它的方法集合中包含了很多方法,其中的一些方法被用来对相应的文件进行写操作和读操作。
假设,我们需要创建一个文件来存放数据。在同一个时刻,可能会有多个Goroutine分别进行对此文件的进行写操作和读操作。每一次写操作都应该向这个文件写入若干个字节的数据。这若干字节的数据应该作为一个独立的数据块存在。这就意味着,写操作之间不能彼此干扰,写入的内容之间也不能出现穿插和混淆的情况。另一方面,每一次读操作都应该从这个文件中读取一个独立、完整的数据块。它们读取的数据块不能重复,且需要按顺序读取。例如,第一个读操作读取了数据块1,那么第二个读操作就应该去读取数据块2,而第三个读操作则应该读取数据块3,以此类推。对于这些读操作是否可以被同时进行,这里并不做要求。即使它们被同时进行,程序也应该分辨出它们的先后顺序。
为了突出重点,我们规定每个数据块的长度都是相同的。该长度应该在初始化的时候被给定。若写操作实际欲写入数据的长度超过了该值,则超出部分将会被截掉。
当我们拿到这样一个需求的时候,首先应该想到使用os.File类型。它为我们操作文件系统中的文件提供了底层的支持。但是,该类型的相关方法并没有对并发操作的安全性进行保证。换句话说,这些方法不是并发安全的。我只能通过额外的同步手段来保证这一点。鉴于这里需要分别对两类操作(即写操作和读操作)进行访问控制,所以读写锁在这里会比普通的互斥锁更加适用。不过,关于多个读操作要按顺序且不能重复读取的这个问题,我们需还要使用其他辅助手段来解决。
为了实现上述需求,我们需要创建一个类型。作为该类型的行为定义,我们先编写了一个这样的接口:
type DataFile interface {
// 读取一个数据块。
Read() (rsn int64, d Data, err error)
// 写入一个数据块。
Write(d Data) (wsn int64, err error)
// 获取最后读取的数据块的序列号。
Rsn() int64
// 获取最后写入的数据块的序列号。
Wsn() int64
// 获取数据块的长度
DataLen() uint32
}
其中,类型Data被声明为一个[]byte的别名类型:
type Data []byte
而名称wsn和rsn分别是Writing Serial Number和Reading Serial Number的缩写形式。它们分别代表了最后被写入的数据块的序列号和最后被读取的数据块的序列号。这里所说的序列号相当于一个计数值,它会从1开始。因此,我们可以通过调用Rsn方法和Wsn方法得到当前已被读取和写入的数据块的数量。
根据上面对需求的简单分析和这个DataFile接口类型声明,我们就可以来编写真正的实现了。我们将这个实现类型命名为myDataFile。它的基本结构如下:
type myDataFile struct {
f *os.File // 文件。
fmutex sync.RWMutex // 被用于文件的读写锁。
woffset int64 // 写操作需要用到的偏移量。
roffset int64 // 读操作需要用到的偏移量。
wmutex sync.Mutex // 写操作需要用到的互斥锁。
rmutex sync.Mutex // 读操作需要用到的互斥锁。
dataLen uint32 // 数据块长度。
}
类型myDataFile共有七个字段。我们已经在前面说明过前两个字段存在的意义。由于对数据文件的写操作和读操作是各自独立的,所以我们需要两个字段来存储两类操作的进行进度。在这里,这个进度由偏移量代表。此后,我们把woffset字段称为写偏移量,而把roffset字段称为读偏移量。注意,我们在进行写操作和读操作的时候会分别增加这两个字段的值。当有多个写操作同时要增加woffset字段的值的时候就会产生竞态条件。因此,我们需要互斥锁wmutex来对其加以保护。类似的,rmutex互斥锁被用来消除多个读操作同时增加roffset字段的值时产生的竞态条件。最后,由上述的需求可知,数据块的长度应该是在初始化myDataFile类型值的时候被给定的。这个长度会被存储在该值的dataLen字段中。它与DataFile接口中声明的DataLen方法是对应的。下面我们就来看看被用来创建和初始化DataFile类型值的函数NewDataFile。
关于这类函数的编写,读者应该已经驾轻就熟了。NewDataFile函数会返回一个DataFile类型值,但是实际上它会创建并初始化一个*myDataFile类型的值并把它作为它的结果值。这样可以通过编译的原因是,后者会是前者的一个实现类型。NewDataFile函数的完整声明如下:
f, err := os.Create(path)
if err != nil {
return nil, err
}
if dataLen == 0 {
return nil, errors.New("Invalid data length!")
}
df := &myDataFile{f: f, dataLen: dataLen}
return df, nil
}
可以看到,我们在创建*myDataFile类型值的时候只需要对其中的字段f和dataLen进行初始化。这是因为woffset字段和roffset字段的零值都是0,而在未进行过写操作和读操作的时候它们的值理应如此。对于字段fmutex、wmutex和rmutex来说,它们的零值即为可用的锁。所以我们也不必对它们进行显式的初始化。
把变量df的值作为NewDataFile函数的第一个结果值体现了我们的设计意图。但要想使*myDataFile类型真正成为DataFile类型的一个实现类型,我们还需要为*myDataFile类型编写出已在DataFile接口类型中声明的所有方法。其中最重要的当属Read方法和Write方法。
我们先来编写*myDataFile类型的Read方法。该方法应该按照如下步骤实现。
(1) 获取并更新读偏移量。
(2) 根据读偏移量从文件中读取一块数据。
(3) 把该数据块封装成一个Data类型值并将其作为结果值返回。
其中,前一个步骤在被执行的时候应该由互斥锁rmutex保护起来。因为,我们要求多个读操作不能读取同一个数据块,并且它们应该按顺序的读取文件中的数据块。而第二个步骤,我们也会用读写锁fmutex加以保护。下面是这个Read方法的第一个版本:
// 读取并更新读偏移量
var offset int64
df.rmutex.Lock()
offset = df.roffset
df.roffset += int64(df.dataLen)
df.rmutex.Unlock()
//读取一个数据块
rsn = offset / int64(df.dataLen)
df.fmutex.RLock()
defer df.fmutex.RUnlock()
bytes := make([]byte, df.dataLen)
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
return
}
d = bytes
return
}
可以看到,在读取并更新读偏移量的时候,我们用到了rmutex字段。这保证了可能同时运行在多个Goroutine中的这两行代码:
df.roffset += int64(df.dataLen)
的执行是互斥的。这是我们为了获取到不重复且正确的读偏移量所必需采取的措施。
另一方面,在读取一个数据块的时候,我们适时的进行了fmutex字段的读锁定和读解锁操作。fmutex字段的这两个操作可以保证我们在这里读取到的是完整的数据块。不过,这个完整的数据块却并不一定是正确的。为什么会这样说呢?
请想象这样一个场景。在我们的程序中,有3个Goroutine来并发的执行某个*myDataFile类型值的Read方法,并有2个Goroutine来并发的执行该值的Write方法。通过前3个Goroutine的运行,数据文件中的数据块被依次的读取了出来。但是,由于进行写操作的Goroutine比进行读操作的Goroutine少,所以过不了多久读偏移量roffset的值就会等于甚至大于写偏移量woffset的值。也就是说,读操作很快就会没有数据可读了。这种情况会使上面的df.f.ReadAt方法返回的第二个结果值为代表错误的非nil且会与io.EOF相等的值。实际上,我们不应该把这样的值看成错误的代表,而应该把它看成一种边界情况。但不幸的是,我们在这个版本的Read方法中并没有对这种边界情况做出正确的处理。该方法在遇到这种情况时会直接把错误值返回给它的调用方。该调用方会得到读取出错的数据块的序列号,但却无法再次尝试读取这个数据块。由于其他正在或后续执行的Read方法会继续增加读偏移量roffset的值,所以当该调用方再次调用这个Read方法的时候只可能读取到在此数据块后面的其他数据块。注意,执行Read方法时遇到上述情况的次数越多,被漏读的数据块也就会越多。为了解决这个问题,我们编写了Read方法的第二个版本:
// 读取并更新读偏移量
// 省略若干条语句
//读取一个数据块
rsn = offset / int64(df.dataLen)
bytes := make([]byte, df.dataLen)
for {
df.fmutex.RLock()
_, err = df.f.ReadAt(bytes, offset)
if err != nil {
if err == io.EOF {
df.fmutex.RUnlock()
continue
}
df.fmutex.RUnlock()
return
}
d = bytes
df.fmutex.RUnlock()
return
}
}
在上面的Read方法展示中,我们省略了若干条语句。原因在这个位置上的那些语句并没有任何变化。为了进一步节省篇幅,我们在后面也会遵循这样的省略原则。
第二个版本的Read方法使用for语句是为了达到这样一个目的:在其中的df.f.ReadAt方法返回io.EOF错误的时候继续尝试获取同一个数据块,直到获取成功为止。注意,如果在该for代码块被执行期间一直让读写锁fmutex处于读锁定状态,那么针对它的写锁定操作将永远不会成功,且相应的Goroutine也会被一直阻塞。因为它们是互斥的。所以,我们不得不在该for语句块中的每条return语句和continue语句的前面都加入一个针对该读写锁的读解锁操作,并在每次迭代开始时都对fmutex进行一次读锁定。显然,这样的代码看起来很丑陋。冗余的代码会使代码的维护成本和出错几率大大增加。并且,当for代码块中的代码引发了运行时恐慌的时候,我们是很难及时的对读写锁fmutex进行读解锁的。即便可以这样做,那也会使Read方法的实现更加丑陋。我们因为要处理一种边界情况而去掉了defer df.fmutex.RUnlock()语句。这种做法利弊参半。
其实,我们可以做得更好。但是这涉及到了其他同步工具。因此,我们以后再来对Read方法进行进一步的改造。顺便提一句,当df.f.ReadAt方法返回一个非nil且不等于io.EOF的错误值的时候,我们总是应该放弃再次获取目标数据块的尝试而立即将该错误值返回给Read方法的调用方。因为这样的错误很可能是严重的(比如,f字段代表的文件被删除了),需要交由上层程序去处理。
现在,我们来考虑*myDataFile类型的Write方法。与Read方法相比,Write方法的实现会简单一些。因为后者不会涉及到边界情况。在该方法中,我们需要进行两个步骤,即:获取并更新写偏移量和向文件写入一个数据块。我们直接给出Write方法的实现:
// 读取并更新写偏移量
var offset int64
df.wmutex.Lock()
offset = df.woffset
df.woffset += int64(df.dataLen)
df.wmutex.Unlock()
//写入一个数据块
wsn = offset / int64(df.dataLen)
var bytes []byte
if len(d) > int(df.dataLen) {
bytes = d[0:df.dataLen]
} else {
bytes = d
}
df.fmutex.Lock()
df.fmutex.Unlock()
_, err = df.f.Write(bytes)
return
}
这里需要注意的是,当参数d的值的长度大于数据块的最大长度的时候,我们会先进行截短处理再将数据写入文件。如果没有这个截短处理,我们在后面计算的已读数据块的序列号和已写数据块的序列号就会不正确。
有了编写前面两个方法的经验,我们可以很容易的编写出*myDataFile类型的Rsn方法和Wsn方法:
df.rmutex.Lock()
defer df.rmutex.Unlock()
return df.roffset / int64(df.dataLen)
}
func (df *myDataFile) Wsn() int64 {
df.wmutex.Lock()
defer df.wmutex.Unlock()
return df.woffset / int64(df.dataLen)
}
这两个方法的实现分别涉及到了对互斥锁rmutex和wmutex的锁定操作。同时,我们也通过使用defer语句保证了对它们的及时解锁。在这里,我们对已读数据块的序列号rsn和已写数据块的序列号wsn的计算方法与前面示例中的方法是相同的。它们都是用相关的偏移量除以数据块长度后得到的商来作为相应的序列号(或者说计数)的值。
至于*myDataFile类型的DataLen方法的实现,我们无需呈现。它只是简单地将dataLen字段的值作为其结果值返回而已。
编写上面这个完整示例的主要目的是展示互斥锁和读写锁在实际场景中的应用。由于还没有讲到Go语言提供的其他同步工具,所以我们在相关方法中所有需要同步的地方都是用锁来实现的。然而,其中的一些问题用锁来解决是不足够或不合适的。我们会在本节的后续部分中逐步的对它们进行改进。
从这两种锁的源码中可以看出,它们是同源的。读写锁的内部是用互斥锁来实现写锁定操作之间的互斥的。我们可以把读写锁看做是互斥锁的一种扩展。除此之外,这两种锁实现在内部都用到了操作系统提供的同步工具――信号灯。互斥锁内部使用一个二值信号灯(只有两个可能的值的信号灯)来实现锁定操作之间的互斥,而读写锁内部则使用一个二值信号灯和一个多值信号灯(可以有多个可能的值的信号灯)来实现写锁定操作与读锁定操作之间的互斥。当然,为了进行精确的协调,它们还使用到了其他一些字段和变量。由于篇幅原因,我们就不在这里赘述了。如果读者对此感兴趣的话,可以去阅读sync代码包中的相关源码文件。