0%

Go 并发与同步机制

Go 并发与同步机制

Goroutine和Channels

goroutine 和 channels 支持“顺序通信进程”(communicating sequential processes)或被简称为 CSP。CSP 是一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中。传统的并发模型:多线程共享内存,如果你在其它的主流语言中写过并发程序的话可能会更熟悉一些。

Goroutine

Goroutine 不同于 thread,threads 是操作系统中的对于一个独立运行实例的描述,不同操作系统,对于 thread 的实现也不尽相同;但是,操作系统并不知道 goroutine 的存在,goroutine 的调度是由 Golang 运行时进行管理的。启动 thread 虽然比 process 所需的资源要少,但是多个 thread 之间的上下文切换仍然是需要大量的工作的(寄存器/Program Count/Stack Pointer/…),Golang 有自己的调度器,许多 goroutine 的数据都是共享的,因此 goroutine 之间的切换会快很多,启动 goroutine 所耗费的资源也很少,一个 Golang 程序同时存在几百个 goroutine 是很正常的。

Channel,即“管道”,是用来传递数据(叫消息更为合适)的一个数据结构,即可以从 channel 里面塞数据,也可以从中获取数据。channel 本身并没有什么神奇的地方,但是 channel 加上了 goroutine,就形成了一种既简单又强大的请求处理模型, 即 N 个工作 goroutine 将处理的中间结果或者最终结果放入一个 channel,另外有 M 个工作 goroutine 从这个 channel 拿数据,再进行进一步加工,通过组合这种过程,从而胜任各种复杂的业务模型。 当一个程序启动时,其主函数即在一个单独的 goroutine 中运行,我们叫它 main goroutine。新的 goroutine 会用 go 语句来创建。

在语法上,go 语句是一个普通的函数或方法调用前加上关键字 go。go 语句会使其语句中的函数在一个新创建的 goroutine 中运行。而 go 语句本身会迅速地完成。

f() //调用f()并等待返回
go f() //创建一个新 goroutine 执行 f() 不需要等待

Runtime

runtime.Gosched() 用于让出 CPU 时间片。

个人猜测与 Java 中 yield() 类似。原理以后深究。

go 中的 goroutins 并不是同时在运行。事实上,如果没有在代码中通过 runtime.GOMAXPROCS(n) 其中 n 是整数,指定使用多核的话,goroutins 都是在一个线程里的,它们之间通过不停的让出时间片轮流运行,达到类似同时运行的效果。当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutines 转移到另一个系统线程上去,以使这些 goroutines 不阻塞。

func showNumber(i int)  {
   fmt.Println(i)
}

func main()  {
   for i := 0; i < 10; i++ {
      go showNumber(i)
   }

   runtime.Gosched()
   fmt.Println("End")
}

Channels

如果说 goroutine 是 Go 语言程序的并发体的话,那么 Channels 则是它们之间的通信机制。通过指定一个特定类型的 Channel,来建立两个 goroutine 之间的数据通信通道。

Channels的基本操作

ch := make(chan int)

Channel 对应一个 make 创建的底层数据结构的引用,当复制一个 Channel 或用于函数参数传递时,我们只是拷贝了一个 Channel 引用,因此调用者和被调用者将引用同一个 Channel 对象。

而对于 Channels 的值操作,分为发送和接收。

// x is specific type which is defined channel
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded

Channels 还支持 close 操作,用于关闭 Channels,随后对基于该 Channel 的任何发送操作都将导致 panic 异常。对一个已经被 close 过的 Channel 进行接收操作依然可以接受到之前已经成功发送的数据;如果 Channel 中已经没有数据的话将产生一个零值的数据。

close(ch) 

在方法参数的使用上,还有一种特殊用法。通过给 Channels 指定方向来避免 Channels 的滥用

//chan<- 只负责发送数据的Channel <-chan 只负责接收数据的Channel
func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)
	//...
    go squarer(squares, naturals)
	//...
}

Channels的类型

以最简单方式调用 make 函数创建的是一个无缓存的 Channel,但是我们也可以指定第二个整型参数,对应 Channel 的容量。如果 Channel 的容量大于零,那么该 Channel 就是带缓存的 Channel。

ch = make(chan int) // unbuffered channel 
ch = make(chan int, 0) // unbuffered channel 
ch = make(chan int, 3) // buffered channel with capacity 3

无缓存的Channels

一个基于无缓存 Channel 的发送操作将导致发送者 goroutine 阻塞,直到另一个 goroutine 在相同的 Channels 上执行接收操作,当发送的值通过 Channels 成功传输 之后,两个 goroutine 可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者 goroutine 也将阻塞,直到有另一个 goroutine 在相同的 Channels 上执行发送操作。

基于无缓存 Channels 的发送和接收操作将导致两个 goroutine 做一次同步操作。因为这个原因,无缓存 Channels 有时候也被称为同步 Channels。当通过一个无缓存 Channels 发送数据时,接收者收到数据发生在唤醒发送者 goroutine 之前。

可缓存的Channels

带缓存的 Channel 内部持有一个元素队列。队列的最大容量是在调用 make 函数创建 Channel 时通过第二个参数指定的。

向缓存 Channel 的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个 goroutine 执行接收操作而释放了新的队列空间。相反,如果 Channel 是空的,接收操作将阻塞直到有另一个 goroutine 执行发送操作而向队列插入元素。

需要知道的是,我们可以使用cap()len()获取 Channel 内部缓存的容量和长度。

基于共享变量的并发

Channels信号量

下面使用了一个 buffered channel 作为一个计数信号量,来保证最多只有 20 个 goroutine 会同时执行 HTTP 请求。同理,我们可以用一个容量只有 1 的 channel 来保证最多只有一个 goroutine 在同一时刻访问一个共享变量。一个只能为 1 和 0 的信号量叫做二元信号量(binary semaphore)。

var (
    sema    = make(chan struct{}, 1) // a binary semaphore guarding balance
    balance int
)

func Deposit(amount int) {
    sema <- struct{}{} // acquire token
    balance = balance + amount
    <-sema // release token
}

func Balance() int {
    sema <- struct{}{} // acquire token
    b := balance
    <-sema // release token
    return b
}

sync.Mutex互斥锁

通过调用 mutex 的 Lock 方法来获取一个互斥锁。如果其它的 goroutine 已经获得了这个锁的话,这个操作会被阻塞直到其它 goroutine 调用了 Unlock 使该锁变回可用状态。mutex 会保护共享变量。惯例来说,被 mutex 所保护的变量是在 mutex 变量声明之后立刻声明的。如果你的做法和惯例不符,确保在文档里对你的做法进行说明。

在 Lock 和 Unlock 之间的代码段中的内容 goroutine 可以随便读取或者修改,这个代码段叫做临界区。锁的持有者在其他 goroutine 获取该锁之前需要调用 Unlock。goroutine 在结束后释放锁是必要的,无论以哪条路径通过函数都需要释放,即使是在错误路径中,也要记得释放。

import "sync"

var (
    mu      sync.Mutex // guards balance
    balance int
)

func Deposit(amount int) {
    mu.Lock()
    balance = balance + amount
    mu.Unlock()
}

func Balance() int {
    mu.Lock()
    b := balance
    mu.Unlock()
    return b
}

由于在存款和查询余额函数中的临界区代码这么短 —— 只有一行,没有分支调用——在代码最后去调用 Unlock 就显得更为直截了当。在更复杂的临界区的应用中,尤其是必须要尽早处理错误并返回的情况下,就很难去(靠人)判断对 Lock 和 Unlock 的调用是在所有路径中都能够严格配对的了。Go 语言里的 defer 简直就是这种情况下的救星:我们用 defer 来调用 Unlock,临界区会隐式地延伸到函数作用域的最后,这样我们就从“总要记得在函数返回之后或者发生错误返回时要记得调用一次 Unlock”这种状态中获得了解放。Go 会自动帮我们完成这些事情。

此外,一个 deferred Unlock 即使在临界区发生 panic 时依然会执行,这对于用 recover(§5.10)来恢复的程序来说是很重要的。defer 调用只会比显式地调用 Unlock 成本高那么一点点,不过却在很大程度上保证了代码的整洁性。大多数情况下对于并发程序来说,代码的整洁性比过度的优化更重要。如果可能的话尽量使用 defer 来将临界区扩展到函数的结束。

Note:sync.Mutex 为非重入锁

sync.RWMutex读写锁

“多读单写”锁(multiple readers, single writer lock)允许多个只读操作并行执行,但写操作会完全互斥。Go 语言提供的这样的锁是 sync.RWMutex:

var mu sync.RWMutex
var balance int
func Balance() int {
    mu.RLock() // readers lock
    defer mu.RUnlock()
    return balance
}

Balance函数现在调用了 RLock 和 RUnlock 方法来获取和释放一个读取或者共享锁。Deposit 函数没有变化,会调用 mu.Lock 和 mu.Unlock 方法来获取和释放一个写或互斥锁。

RWMutex 只有当获得锁的大部分 goroutine 都是读操作,而锁在竞争条件下,也就是说,goroutine 们必须等待才能获取到锁的时候,RWMutex 才是最能带来好处的。RWMutex 需要更复杂的内部记录,所以会让它比一般的无竞争锁的 mutex 慢一些。

内存同步

你可能比较纠结为什么 Balance 方法需要用到互斥条件,无论是基于 channel 还是基于互斥量。毕竟和存款不一样,它只由一个简单的操作组成,所以不会碰到其它 goroutine 在其执行“期间”执行其它逻辑的风险。这里使用 mutex 有两方面考虑。第一 Balance 不会在其它操作比如 Deposit “中间”执行。第二(更重要的)是“同步”不仅仅是一堆 goroutine 执行顺序的问题,同样也会涉及到内存的问题。

在现代计算机中可能会有一堆处理器,每一个都会有其本地缓存(local cache)。为了效率,对内存的写入一般会在每一个处理器中缓冲,并在必要时一起 flush 到主存。这种情况下这些数据可能会以与当初 goroutine 写入顺序不同的顺序被提交到主存。像 channel 通信或者互斥量操作这样的原语会使处理器将其聚集的写入 flush 并 commit,这样 goroutine 在某个时间点上的执行结果才能被其它处理器上运行的 goroutine 得到。

考虑一下下面代码片段的可能输出:

var x, y int
go func() {
    x = 1 // A1
    fmt.Print("y:", y, " ") // A2
}()
go func() {
    y = 1                   // B1
    fmt.Print("x:", x, " ") // B2
}()

因为两个 goroutine 是并发执行,并且访问共享变量时也没有互斥,会有数据竞争,所以程序的运行结果没法预测的话也请不要惊讶。我们可能希望它能够打印出下面这四种结果中的一种,相当于几种不同的交错执行时的情况:

y:0 x:1
x:0 y:1
x:1 y:1
y:1 x:1

第四行可以被解释为执行顺序 A1, B1, A2, B2 或者 B1, A1, A2, B2 的执行结果。然而实际运行时还是有些情况让我们有点惊讶:

x:0 y:0
y:0 x:0

sync.Once惰性初始化

如果初始化成本比较大的话,那么将初始化延迟到需要的时候再去做就是一个比较好的选择。如果在程序启动的时候就去做这类初始化的话,会增加程序的启动时间,并且因为执行的时候可能也并不需要这些变量,所以实际上有一些浪费。让我们来看在本章早一些时候的 icons 变量:

var icons map[string]image.Image

这个版本的 Icon 用到了懒初始化(lazy initialization)。

func loadIcons() {
    icons = map[string]image.Image{
        "spades.png":   loadIcon("spades.png"),
        "hearts.png":   loadIcon("hearts.png"),
        "diamonds.png": loadIcon("diamonds.png"),
        "clubs.png":    loadIcon("clubs.png"),
    }
}

// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons() // one-time initialization
    }
    return icons[name]
}

如果一个变量只被一个单独的 goroutine 所访问的话,我们可以使用上面的这种模板,但这种模板在 Icon 被并发调用时并不安全。因为缺少显式的同步,编译器和 CPU 是可以随意地去更改访问内存的指令顺序,以任意方式,只要保证每一个 goroutine 自己的执行顺序一致。其中一种可能 loadIcons 的语句重排是下面这样。它会在填写 icons 变量的值之前先用一个空 map 来初始化 icons 变量。

func loadIcons() {
    icons = make(map[string]image.Image)
    icons["spades.png"] = loadIcon("spades.png")
    icons["hearts.png"] = loadIcon("hearts.png")
    icons["diamonds.png"] = loadIcon("diamonds.png")
    icons["clubs.png"] = loadIcon("clubs.png")
}

因此,一个 goroutine 在检查 icons 是非空时,也并不能就假设这个变量的初始化流程已经走完了(译注:可能只是塞了个空map,里面的值还没填完,也就是说填值的语句都没执行完呢)。

最简单且正确的保证所有 goroutine 能够观察到 loadIcons 效果的方式,是用一个 mutex 来同步检查。

var mu sync.Mutex // guards icons
var icons map[string]image.Image

// Concurrency-safe.
func Icon(name string) image.Image {
    mu.Lock()
    defer mu.Unlock()
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}

然而使用互斥访问 icons 的代价就是没有办法对该变量进行并发访问,即使变量已经被初始化完毕且再也不会进行变动。这里我们可以引入一个允许多读的锁:

var mu sync.RWMutex // guards icons
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
    mu.RLock()
    if icons != nil {
        icon := icons[name]
        mu.RUnlock()
        return icon
    }
    mu.RUnlock()

    // acquire an exclusive lock
    mu.Lock()
    if icons == nil { // NOTE: must recheck for nil
        loadIcons()
    }
    icon := icons[name]
    mu.Unlock()
    return icon
}

上面的代码有两个临界区。goroutine 首先会获取一个读锁,查询 map,然后释放锁。如果条目被找到了(一般情况下),那么会直接返回。如果没有找到,那 goroutine 会获取一个写锁。不释放共享锁的话,也没有任何办法来将一个共享锁升级为一个互斥锁,所以我们必须重新检查 icons 变量是否为 nil,以防止在执行这一段代码的时候,icons 变量已经被其它 gorouine 初始化过了。

上面的模板使我们的程序能够更好的并发,但是有一点太复杂且容易出错。幸运的是,sync 包为我们提供了一个专门的方案来解决这种一次性初始化的问题:sync.Once。概念上来讲,一次性的初始化需要一个互斥量 mutex 和一个 boolean 变量来记录初始化是不是已经完成了;互斥量用来保护 boolean 变量和客户端数据结构。Do 这个唯一的方法需要接收初始化函数作为其参数。让我们用 sync.Once 来简化前面的 Icon 函数吧:

var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

每一次对 Do(loadIcons) 的调用都会锁定 mutex,并会检查 boolean 变量(译注:Go1.9 中会先判断 boolean 变量是否为 1(true),只有不为 1 才锁定 mutex,不再需要每次都锁定 mutex)。在第一次调用时, boolean 变量的值是 false,Do 会调用 loadIcons 并会将 boolean 变量设置为 true。随后的调用什么都不会做,但是 mutex 同步会保证 loadIcons 对内存(这里其实就是指 icons 变量啦)产生的效果能够对所有 goroutine 可见。用这种方式来使用 sync.Once 的话,我们能够避免在变量被构建完成之前和其它 goroutine 共享该变量。

竞争条件检测

即使我们小心到不能再小心,但在并发程序中犯错还是太容易了。幸运的是,Go 的 runtime 和工具链为我们装备了一个复杂但好用的动态分析工具,竞争检查器(the race detector)。

只要在 go build,go run 或者 go test 命令后面加上-race 的 flag,就会使编译器创建一个你的应用的“修改”版或者一个附带了能够记录所有运行期对共享变量访问工具的 test,并且会记录下每一个读或者写共享变量的 goroutine 的身份信息。另外,修改版的程序会记录下所有的同步事件,比如 go 语句,channel 操作,以及对(*sync.Mutex).Lock(*sync.WaitGroup).Wait等等的调用。(完整的同步事件集合是在The Go Memory Model文档中有说明,该文档是和语言文档放在一起的。译注:https://golang.org/ref/mem

竞争检查器会检查这些事件,会寻找在哪一个 goroutine 中出现了这样的case,例如其读或者写了一个共享变量,这个共享变量是被另一个 goroutine 在没有进行干预同步操作便直接写入的。这种情况也就表明了是对一个共享变量的并发访问,即数据竞争。这个工具会打印一份报告,内容包含变量身份,读取和写入的 goroutine 中活跃的函数的调用栈。这些信息在定位问题时通常很有用。

竞争检查器会报告所有的已经发生的数据竞争。然而,它只能检测到运行时的竞争条件;并不能证明之后不会发生数据竞争。所以为了使结果尽量正确,请保证你的测试并发地覆盖到了你的包。

由于需要额外的记录,因此构建时加了竞争检测的程序跑起来会慢一些,且需要更大的内存,即使是这样,这些代价对于很多生产环境的程序(工作)来说还是可以接受的。对于一些偶发的竞争条件来说,让竞争检查器来干活可以节省无数日夜的 debugging。

Goroutines和线程

goroutine 和操作系统的线程区别可以先忽略。尽管两者的区别实际上只是一个量的区别,但量变会引起质变的道理同样适用于 goroutine 和线程。现在正是我们来区分开两者的最佳时机。

动态栈

每一个 OS 线程都有一个固定大小的内存块(一般会是 2 MB)来做栈,这个栈会用来存储当前正在被调用或挂起(指在调用其它函数时)的函数的内部变量。这个固定大小的栈同时很大又很小。因为 2MB 的栈对于一个小小的 goroutine 来说是很大的内存浪费,比如对于我们用到的,一个只是用来 WaitGroup 之后关闭 channel 的 goroutine 来说。而对于 go 程序来说,同时创建成百上千个 goroutine 是非常普遍的,如果每一个 goroutine 都需要这么大的栈的话,那这么多的 goroutine 就不太可能了。除去大小的问题之外,固定大小的栈对于更复杂或者更深层次的递归函数调用来说显然是不够的。修改固定的大小可以提升空间的利用率,允许创建更多的线程,并且可以允许更深的递归调用,不过这两者是没法同时兼备的。

相反,一个 goroutine 会以一个很小的栈开始其生命周期,一般只需要 2KB。一个 goroutine 的栈,和操作系统线程一样,会保存其活跃或挂起的函数调用的本地变量,但是和 OS 线程不太一样的是,一个 goroutine 的栈大小并不是固定的;栈的大小会根据需要动态地伸缩。而 goroutine 的栈的最大值有 1GB,比传统的固定大小的线程栈要大得多,尽管一般情况下,大多 goroutine 都不需要这么大的栈。

Goroutine调度

OS 线程会被操作系统内核调度。每几毫秒,一个硬件计时器会中断处理器,这会调用一个叫作 scheduler 的内核函数。这个函数会挂起当前执行的线程并将它的寄存器内容保存到内存中,检查线程列表并决定下一次哪个线程可以被运行,并从内存中恢复该线程的寄存器信息,然后恢复执行该线程的现场并开始执行线程。因为操作系统线程是被内核所调度,所以从一个线程向另一个“移动”需要完整的上下文切换,也就是说,保存一个用户线程的状态到内存,恢复另一个线程的到寄存器,然后更新调度器的数据结构。这几步操作很慢,因为其局部性很差需要几次内存访问,并且会增加运行的 CPU 周期。

Go 的运行时包含了其自己的调度器,这个调度器使用了一些技术手段,比如 m:n 调度,因为其会在 n 个操作系统线程上多工(调度)m 个 goroutine。Go 调度器的工作和内核的调度是相似的,但是这个调度器只关注单独的 Go 程序中的 goroutine(译注:按程序独立)。

和操作系统的线程调度不同的是,Go 调度器并不是用一个硬件定时器,而是被Go语言“建筑”本身进行调度的。例如当一个 goroutine 调用了 time.Sleep,或者被 channel 调用或者 mutex 操作阻塞时,调度器会使其进入休眠并开始执行另一个 goroutine,直到时机到了再去唤醒第一个 goroutine。因为这种调度方式不需要进入内核的上下文,所以重新调度一个 goroutine 比调度一个线程代价要低得多。

GOMAXPROCS

Go 的调度器使用了一个叫做 GOMAXPROCS 的变量来决定会有多少个操作系统的线程同时执行 Go 的代码。其默认的值是运行机器上的 CPU 的核心数,所以在一个有 8 个核心的机器上时,调度器一次会在 8 个 OS 线程上去调度 GO 代码。(GOMAXPROCS 是前面说的 m:n 调度中的 n)。在休眠中的或者在通信中被阻塞的 goroutine 是不需要一个对应的线程来做调度的。在 I/O 中或系统调用中或调用非 Go 语言函数时,是需要一个对应的操作系统线程的,但是 GOMAXPROCS 并不需要将这几种情况计算在内。

可以用 GOMAXPROCS 的环境变量来显式地控制这个参数,或者也可以在运行时用 runtime.GOMAXPROCS 函数来修改它。

for {
    go fmt.Print(0)
    fmt.Print(1)
}

$ GOMAXPROCS=1 go run hacker-cliché.go
111111111111111111110000000000000000000011111...

$ GOMAXPROCS=2 go run hacker-cliché.go
010101010101010101011001100101011010010100110...

在第一次执行时,最多同时只能有一个 goroutine 被执行。初始情况下只有 main goroutine 被执行,所以会打印很多 1。过了一段时间后,GO 调度器会将其置为休眠,并唤醒另一个 goroutine,这时候就开始打印很多 0 了,在打印的时候,goroutine 是被调度到操作系统线程上的。在第二次执行时,我们使用了两个操作系统线程,所以两个 goroutine 可以一起被执行,以同样的频率交替打印 0 和 1。我们必须强调的是 goroutine 的调度是受很多因子影响的,而 runtime 也是在不断地发展演进的,所以这里的你实际得到的结果可能会因为版本的不同而与我们运行的结果有所不同。

Goroutine没有ID号

在大多数支持多线程的操作系统和程序语言中,当前的线程都有一个独特的身份(ID),并且这个身份信息可以以一个普通值的形式被很容易地获取到,典型的可以是一个 int 或者指针值。这种情况下我们做一个抽象化的 thread-local storage(线程本地存储,多线程编程中不希望其它线程访问的内容)就很容易,只需要以线程的 ID 作为 key 的一个 map 就可以解决问题,每一个线程以其 id 就能从中获取到值,且和其它线程互不冲突。

goroutine 没有可以被程序员获取到的身份(ID)的概念。这一点是设计上故意而为之,由于 thread-local storage 总是会被滥用。比如说,一个 Web Server 是用一种支持 tls 的语言实现的,而非常普遍的是很多函数会去寻找 HTTP 请求的信息,这代表它们就是去其存储层(这个存储层有可能是 tls)查找的。这就像是那些过分依赖全局变量的程序一样,会导致一种非健康的“距离外行为”,在这种行为下,一个函数的行为可能并不仅由自己的参数所决定,而是由其所运行在的线程所决定。因此,如果线程本身的身份会改变 —— 比如一些 Worker 线程之类的——那么函数的行为就会变得神秘莫测。

Go 鼓励更为简单的模式,这种模式下参数(译注:外部显式参数和内部显式参数。tls 中的内容算是”外部”隐式参数)对函数的影响都是显式的。这样不仅使程序变得更易读,而且会让我们自由地向一些给定的函数分配子任务时不用担心其身份信息影响行为。