文章目录
14 Go语言goroutine和通道详解14.1 goroutine14.2 通道(channel)声明通道变量创建通道通道操作 14.3 管道14.4 单向通道14.5 通道缓冲区非缓冲通道缓冲通道 14.6 通道多路复用select语句select语句的特殊应用
14 Go语言goroutine和通道详解
Go语言中通过goroutine来实现并发。goroutine是一种轻量级线程,在一台很普通的设备上,可以轻松开启成千上万的goroutine。
Go语言有两种并发编程风格。
第一种是CSP(通信顺序进程)模式,这种模式使用通道在goroutine之间通信。(本章介绍)第二种是传统的多线程模式,这种模式使用共享变量在goroutine之间的通信。(将在下一章介绍)14.1 goroutine
在Go中,每一个并发执行的活动称为goroutine,goroutine的调度是由Go语言运行时进行管理的。
当一个程序启动时,只有一个goroutine,该goroutine调用main函数,我们称它为主goroutine。主goroutine在程序启动时自动创建,新的goroutine需要通过go语句进行创建。在一个函数调用或方法调用前面加上go关键字,即可新建一个goroutine,在该goroutine中调用函数或方法,而当前goroutine中的go语句立即返回。
新建goroutine的语法:
go 函数名(参数列表)
示例:
package mainimport ( "fmt" "time")func myFunction(s string) { for i:= 0; i < 5; i++ { time.Sleep(100*time.Millisecond) fmt.Println(s) }}func main() { go myFunction("aaaaaa") myFunction("bbbbbb")}
执行以上代码,输出如下内容:
bbbbbbaaaaaabbbbbbaaaaaabbbbbbaaaaaabbbbbbaaaaaabbbbbbaaaaaabbbbbbaaaaaabbbbbbaaaaaabbbbbbaaaaaabbbbbbaaaaaabbbbbb
14.2 通道(channel)
通道用来在goroutine之间传送数据。一个通道是用来传送某个具体类型值的管道,该类型叫做通道的元素类型。一个具有int类型元素的通道写为chan int;一个具有string类型元素的通道写为chan string。
声明通道变量
// var_name是变量名, T是具体的通道元素类型var var_name chan T
通道的零值是nil。
创建通道
使用内置的make函数来创建一个通道:
// 创建一个元素类型为T的通道make(chan T)
像Map一样,通道是一个使用make创建的数据结构的引用。当复制或者作为函数参数时,复制的是引用。
通道操作
通道有两个主要操作:发送(send)和接收(receive),两者统称为通信。
send语句发送一个值到通道中,receive语句从通道中接收一个值,通道是先入先出的FIFO,即先发送进通道的值先被接收。两个操作都使用<-操作符。
// 将x发送到通道chch <- x// 从通道ch中接收一个值,存入chx = <- ch// 从通道ch中接受一个值,丢弃结果<- ch
通道还支持第三个操作:关闭(close)。它设置一个标志位来指示值当前已经发送完毕,后面不会再有值了。关闭后再做发送操作将导致宕机异常。在一个已关闭的通道上进行接收操作,将获取所有已经发送的值,直到通道为空,这时任何接收操作会立即完成,同时获取到一个通道元素类型对应的零值。
调用内置的close函数来关闭通道:
// 关闭通道chclose(ch)
14.3 管道
通道可以用来连接goroutine,这样一来,一个goroutine的输出就是另外一个goroutine的输入。这种使用方式叫管道。下面的程序由三个goroutine组成,他们被两个通道连接起来,如下图所示:
第一个goroutine是counter,产生一个0,1,2,3,…的整数序列,然后通过一个管道发送给第二个goroutine(叫squarer),计算数的平方,然后将结果通过另一个管道发送给第三个goroutine(叫printer),接收值并输出他们。
示例:
func main() { naturals := make(chan int) squares := make(chan int) // counter go func() { for x := 0; ; x++ { naturals <- x time.Sleep(100*time.Millisecond) } }() // squarer go func() { for { x := <- naturals squares <- (x*x) } }() // printer for { fmt.Println(<-squares) }}
上面的程序无限的输出平方序列0,1,4,9,…。如果想要通过通道发送有限的数字怎么办?
如果发送方直到没有更多的数据要发送,需要告诉接收者goroutine停止等待通道种的数据,这可以通过关闭通道来实现。
close(ch)
在通道关闭后,任何后续的发送操作将会导致应用崩溃。当关闭的通道被读完(就是最后一个发送的值被接收)后,所有后续的接收操作顺畅进行,只是获取到的是零值。关闭naturals通道导致squarer从通道接收的都是0,并将平方结果0传递给printer。
没有一个直接的方式来判断通道是否已经关闭,但是通道的接收操作还有一个变种:当使用两个变量来接收通道时,第一个变量接收通道中的元素值,第二个变量接收通道是否已被关闭且读完。利用这个特性,可以修改上面的程序,只输出0到100的平方。
func main() { naturals := make(chan int) squares := make(chan int) // counter go func() { for x := 0; x <= 100; x++ { naturals <- x time.Sleep(100*time.Millisecond) }close(naturals) }() // squarer go func() { for { x, ok := <- naturalsif !ok {break}squares <- (x*x) }close(squares) }() // printer for {x, ok := <-squaresif !ok {break} fmt.Println(x) }}
Go语言的for-range循环也支持从通道中读元素,并且当通道被关闭的时候,for-range循环退出。可以使用for-range循环简化上面的程序。
func main() { naturals := make(chan int) squares := make(chan int) // counter go func() { for x := 0; x <= 100; x++ { naturals <- x time.Sleep(100*time.Millisecond) }close(naturals) }() // squarer go func() {for x := range naturals {squares <- (x*x)}close(squares) }() // printerfor x := range squares { fmt.Println(x) }}
14.4 单向通道
前面的例子中,counter只向通道naturals发送值,squarer只从通道naturals接收值;squarer只向通道squares发送值,printer仅从通道squares接收值。但在程序实现的时候,如果写错了发送方向,编译器并不能帮我们检查出来。
能否为通道的操作限定为单一的方向呢(只能用于发送、或只能用于接收)?答案是可以。
Go语言提供了单向通道类型,有两种:
类型chan<- T
是一个只能发送的通道,不允许对其执行接收操作类型<-chan T
是一个只能接收的通道,不允许对其执行发送操作 close操作可用在仅能发送的通道上,不可用在仅能接收的通道上。
使用单向通道变量仅能执行单向操作,当执行非法操作时,Go语言在编译时就检查出错误。
让我们再一次修改前面的输出平方值的例子,这次使用单向通道:
func counter(out chan<- int) {for x := 0; x <= 100; x++ {out <- xtime.Sleep(100*time.Millisecond)}close(out)}func squarer(out chan<- int, in <-chan int) {for x := range in {out <- (x*x)}close(out)}func printer(in <-chan int) {for x := range in { fmt.Println(x) }}func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals)go squarer(squares, naturals)printer(squares)}
counter(naturals)调用隐式地将chan int
类型转化为参数要求的chan<- int
类型,printer(squares)调用做了类似的转换。将双向通道转换为单向通道是允许的,但是反过来是不行的。
14.5 通道缓冲区
使用前面的make调用创建的通道叫无缓冲通道,但make还可以接收第二个可选参数——一个表示通道容量的整数,用来创建一个带缓冲的通道。如果容量是0,make创建的还是一个无缓冲通道。
ch = make(chan int)// 无缓冲通道ch = make(chan int, 0)// 无缓冲通道ch = make(chan int, 1)// 缓冲容量为1的通道ch = make(chan int, 3)// 传冲容量为3的通道
非缓冲通道
无缓冲通道上的发送操作将会阻塞,直到另一个goroutine在对应的通道上执行接收操作,这时值传送完成,两个goroutine都可以继续执行。相反,如果接收操作先执行,接收方goroutine将阻塞,直到另一个goroutine在同一个通道上发送一个值。
使用无缓冲通道进行的通信导致发送和接收方goroutine同步化。因此,无缓冲通道也称为同步通道。
缓冲通道
缓冲通道有一个元素队列,队列的最大长度在创建时通过make函数的第二个参数来设置。下面的语句创建一个可以容纳三个字符串的缓冲通道。
ch = make(chan string, 3)
缓冲通道上的发送操作在队列的尾部插入一个元素,接收操作从队列的头部移除一个元素。如果填满了,发送操作会阻塞所在的goroutine直到另一个goroutine对它进行接收操作来留出可用空间。反过来,如果通道是空的,执行接收操作的goroutine阻塞,直到另一个goroutine在通道上发送数据。
可以在上面例子的通道种无阻塞的发送三个值:
ch <- "A"ch <- "B"ch <- "C"
这时通道是满的,再向通道发送值将阻塞当前goroutine。
ch <- "D" // 将会阻塞
直到有goroutine从通道接收了一个值,前面被阻塞的goroutine被唤醒,然后向通道发送"D":
<-ch// "A"
缓冲通道的缓冲区将发送和接收goroutine解耦了,将这两方的goroutine异步化了。
程序如果想直到缓冲区的容量,可以通过调用内置的cap函数来获取:
fmt.Println(cap(ch))// 获取通道的容量
当使用len函数时,将获取到通道种的元素个数。但是在并发程序中,得到的元素个数会随着程序的运行很快过期,所以这个信息的价值很低,但在错误诊断和性能优化时很有用。
14.6 通道多路复用
select语句
让我们来看一个例子。下面的程序对火箭发射进行倒计时,其中的time.Tick函数返回一个通道,它定期向通道中发送事件,每个事件的值是一个事件戳。然后程序通过一个倒计时循环从tick通道中接收事件,当接收了10次以后,循环退出,然后执行launch():
func main() { fmt.Println("Commencing countdown.") tick := time.Tick(1*time.Second) for countdown := 10; countdown > 0; countdown-- { fmt.Println(countdown) <-tick } launch()}func launch() { fmt.Println("lanuch")}
让我们为其添加一个功能:当在倒计时时,我们可以通过按回车键来取消发射。首先,启动一个goroutine从标准输入读取一个字符,如果成功,发送一个值到abort通道:
abort := make(chan struct{})go func() { os.Stdin.Read(make([]byte, 1))// 读取单个字节 abort <- struct{}{}}()
现在每一次倒计时迭代需要等待两个通道中的任何一个有事件:tick通道、abort通道。不能只在一个通道上接收,因为任何一个通道在有事件可接收之前都会阻塞。所以需要对两个通道进行多路复用。为了实现通道的多路复用,需要使用select语句:
select {case <-ch1: // ...case x := <-ch2: // ...case ch <- y: // ...default: // ...}
上面的select语句的每个case分支都尝试一个通道进行发送或接收,select语句将一直等待,直到某个分支的通道上可以执行相应的操作,此时该分支的代码被执行,其他分支将不会执行。
select中的default分支可选的,当有default分支时,在select等待的过程中,如果每一个case分支都没有事件,那么立即执行default分支。当没有default分支时,在select等待的过程中,如果每一个case分支都没有事件,那么将永远等待,直到有一个case分支有事件。
让我们在火箭发射程序中通过select语句同时监听两个通道是否有事件。
package mainimport "fmt"import "time"import "os"func main() { abort := make(chan struct{}) go func() { os.Stdin.Read(make([]byte, 1))// 读取单个字节 abort <- struct{}{} }() fmt.Println("Commencing countdown.") tick := time.Tick(1*time.Second) for countdown := 10; countdown > 0; countdown-- { fmt.Println(countdown) select { case <-tick: // 什么都不做case <-abort: fmt.Println("Launch aborted!") return } } launch()}func launch() { fmt.Println("lanuch")}
如果有多个case分支同时满足,select会随机选择一个,这样保证每一个通道有相同的机会被选中。
select语句的特殊应用
> 非阻塞通信
有时候我们试图在一个通道上发送或接收,但是一旦通道没有准备好,会发生阻塞。我们可以通过select语句实现仅在通道准备好的时候才执行发送或接收操作。
select {case <- ch: // 执行一些事情default: }
> select nil 通道
通道的零值是nil。在nil通道上发送和接收将永远阻塞,对于select语句,如果其分支中的通道是nil,该分支将永远不会被选择。我们可以select一个通道变量,当这个变量为nil是将不会触发对该通道的操作,而不为nil时将触发对该通道的操作。例中这个机制可以实现功能开关、超时处理、取消操作等。