# Go 快速入门(五)- 并发利器

# 本章学习目标

前面掌握过了一些基础知识,我们也动手写过了一个web爬虫程序。本章节我们再来学习一下go语言关于并发的另一重要特性channel

# channel(通道)

我们已经了解过了go语言是通过协程来实现并发。同时,我们也知道协程是独立执行的,它们之间没有通信。前面章节我们也提到过:不通过共享来通信,而要通过通信来共享。go语言如何通过通信来共享呢?

go语言有一种特殊的类型channel,提供了一个很好的通信机制。channel就像一个可以用于发送类型化数据的管道(可以比作 Unix shells 中的双向管道),负责协程之间的通信。

# 1、channel(无缓冲通道)

channel声明方式:

var identifier chan datatype

未初始化的channel的值是nil。channel也是引用类型,所以我们使用 make()函数来给它分配内存。

channel只能传输一种类型的数据,比如 chan int 或者 chan string,所有的类型都可以用于通道,空接口 interface{} 也可以。甚至可以(有时非常有用)创建通道的通道。

channel通过操作符<-来接收和发送数据:

ch <- v    // 发送v到channel ch.
v := <-ch  // 从ch中接收数据,并赋值给v

操作符<-直观的标示了数据的传输:信息按照箭头的方向流动。

  • channel发送数据

    ch := make(chan int)
    ch <- 4		// 将整形4发送到通道
    
  • channel接收数据

    x := <- ch 	// 从通道中接收数据并存储到x变量
    

默认情况下,channel接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得goroutines同步变的更加的简单,而不需要显式的锁。所谓阻塞,也就是如果读取(value := <-ch)它将会被阻塞,直到有数据接收。其次,任何发送(ch<-4)将会被阻塞,直到数据被读出。无缓冲channel是在多个goroutine之间同步很棒的工具。

我们把这些应用到我们的例子中来:

package main

import "fmt"

func sum(a []int, c chan int) {
    total := 0
    for _, v := range a {
        total += v
    }
    c <- total  // send total to c(阻塞:等待数据被读出)
}

func main() {
    a := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(a[:len(a)/2], c)
    go sum(a[len(a)/2:], c)
    x, y := <-c, <-c  // receive from c(阻塞:等待数据写入)

    fmt.Println(x, y, x + y)
}
// 结果:
// go run main.go
//-5 17 12

# 2、Buffered Channels(缓冲通道)

一个无缓冲通道只能包含 1 个元素,有时显得很局限。不过go也允许指定channel的缓冲大小。很简单,就是channel可以存储多个元素。

ch:=make(chan bool, 4)创建了可以存储4个元素的布尔型channel。在这个channel中,前4个元素可以无阻塞的写入。当写入第5个元素时,代码将会阻塞,直到其他goroutinechannel 中读取一些元素,腾出空间。

我们看一下下面这个例子,大家可以在自己本机测试一下,修改相应的value值。

package main

import "fmt"

func main() {
    c := make(chan int, 2) // 修改2为1就报错,修改2为3可以正常运行
    c <- 1
    c <- 2
    fmt.Println(<-c)
    fmt.Println(<-c)
}
// 修改为1报如下的错误:
// fatal error: all goroutines are asleep - deadlock!

# 3、通道遍历与关闭

上面这个例子中,我们需要读取两次c,这样不是很方便,go考虑到了这一点,所以也可以通过range,像操作slice或者map一样操作缓存类型的channel。

for v := range ch {
	fmt.Printf("The value is %v\n", v)
}

上面的代码指定通道中读取数据直到通道关闭,才继续执行下边的代码。很明显,另外一个协程必须写入 ch(不然代码就阻塞在 for 循环了),而且必须在写入完成后才关闭。

我们来看一下如何使用channel实现一个fibonacci 函数:

package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 1, 1
    for i := 0; i < n; i++ { // 循环n次,每次向通道中发送fibonacci数列中一项数据,并计算下一项
        c <- x
        x, y = y, x + y
    }
    close(c)
}

func main() {
    c := make(chan int, 10) // 申明一个能缓冲10个整形数据的channel
    go fibonacci(cap(c), c) // cap(c):缓冲通道的长度
    for i := range c {		// 遍历通道数据
        fmt.Println(i)
    }
}

// 结果:
1
1
2
3
5
8
13
21
34
55

for i := range c能够不断的读取channel里面的数据,直到该channel被显式的关闭。上面代码我们看到可以显式的关闭channel,生产者通过内置函数close关闭channel。关闭channel之后就无法再发送任何数据了,在消费方可以通过语法v, ok := <-ch测试channel是否被关闭。如果ok返回false,那么说明channel已经没有任何数据并且已经被关闭。

注意:记住应该在生产者的地方关闭channel,而不是消费的地方去关闭它,这样容易引起panic 另外记住一点的就是channel不像文件之类的,不需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环时。只有发送者需要关闭通道,接收者永远不会需要。

# 4、通道方向

通道类型可以用注解来表示它只发送或者只接收:

var sendOnly chan<- int 	// 通道只能发送数据
var recvOnly <-chan int		// 通道只能接收数据

只接收的通道(chan<- T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了,所以对只接收通道是没有意义的。通道创建的时候都是双向的,但也可以分配有方向的通道变量,就像以下代码:

var c = make(chan int) // 申明双向通道
go send(c)
go recv(c)

func send(ch chan<- int){	// 函数参数为只能发送数据通道
	for { ch <- 1 }
}

func recv(ch <-chan int) {	// 函数参数为只能接收数据通道
	for { <-ch }
}

# 5、 select

从不同的并发执行的协程中获取值可以通过关键字select来完成,它和switch控制语句非常相似也被称作通信开关;它的表现像轮询机制;select监听进入通道的数据,也可以是用通道发送值的时候。

select {
case u:= <- ch1:
        ...
case v:= <- ch2:
        ...
        ...
default:
      // 当ch1、ch2阻塞的时候执行这里
}

``select默认是阻塞的,只有当监听的通道中有发送或接收可以进行时才会运行,当多个通道都准备好的时候,select是随机的选择一个执行的。default 语句是可选的,可以确保发送不被阻塞;在任何一个 case 中执行 break 或者 returnselect就结束了。

我们来再来看一下如何使用通道换一种方式实现一个fibonacci 函数:

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 1, 1
    for {
        select {
        case c <- x:
            x, y = y, x + y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}
// 结果:
1
1
2
3
5
8
13
21
34
55
quit

# 6、 超时

有时候会出现goroutine阻塞的情况,那么我们如何避免整个程序进入阻塞的情况呢?我们可以利用select接合定时器(Timer)或者计时器(Ticker)来设置超时。

time 包中有一些有趣的功能可以和通道组合使用。其中就包含了计时器 time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值。

// 计时器 time.Tick
type Ticker struct {
    C <-chan Time // 计时向通道发送消息
    ...
}

// 定时器
func After(d Duration) <-chan Time // 定时向通道发送消息

我们来看下面的例子:

package main

import (
	"fmt"
	"time"
)

func main() {
	tick := time.Tick(1e8)
	boom := time.After(5e8)
	for {
		select {
		case <-tick:
			fmt.Println("tick.")
		case <-boom:
			fmt.Println("BOOM!")
			return
		default:
			fmt.Println("    .")
			time.Sleep(5e7)
		}
	}
}
// 结果:
    .
    .
    .
tick.
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
BOOM!

# 本章试练