如果❤️我的文章有帮助,欢迎点赞、关注。这是对我继续技术创作最大的鼓励。更多往期文章在我的个人博客
go 并发编程
Golang
提供 sync
、channel
两种实现方式支持协程(goroutine)并发。
例如我们举个并发下载资源
的例子,实现两种并发编程:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// ============== sync ==============
startTime1 := time.Now()
testGoroutine()
endTime1 := time.Now()
// 可以看到串行需要 3s 的下载操作,并发后,只需要 1s
fmt.Printf("======> Done! use time: %f",(endTime1.Sub(startTime1).Seconds()))
fmt.Println()
// ============== channel ==============
startTime2 := time.Now()
testChannel()
endTime2 := time.Now()
fmt.Printf("======> Done! use time: %f",(endTime2.Sub(startTime2).Seconds()))
fmt.Println()
}
// 使用 sync.WaitGroup 多个并发协程之间不能通信, 等待所有并发协程执行结束
var wg sync.WaitGroup
func testGoroutine() {
for i:=0; i<3; i++ {
wg.Add(1) // wg.Add() 为 wg 添加一个计数; wg.Done() 减去一个计数。
go downloadV1("a.com/time_"+string(i+'0')) // 启动新的协程并发执行 download 函数。
}
wg.Wait() // wg.Wait():等待所有的协程执行结束。
}
// 使用 channel 信道,可以在协程之间传递消息。等待并发协程返回消息。
var ch = make(chan string, 10) // 创建大小 10 的缓存通道
func testChannel() {
for i := 0; i < 3; i++ {
go downloadV2("a.com/" + string(i+'0'))
}
for i := 0; i < 3; i++ {
msg := <-ch // 等待信道返回消息。
fmt.Println("finish", msg)
}
}
// ================= 工具 V =================
func downloadV1(url string) {
fmt.Println("start to download", url)
time.Sleep(time.Second) // 模拟耗时
wg.Done()
}
func downloadV2(url string) {
fmt.Println("start to download", url)
time.Sleep(time.Second)
ch <- url // 将 url 发送给信道
}
下面我们就来详细说说 两种并发实现方式
sync.WaitGroup(等待组)
在 sync.WaitGroup
类型中,每个WaitGroup
都在内部维护着一个计数(初始值为0)。
如果多个并发协程间不需要通信
,那么非常适合使用 sync.WaitGroup
,等待所有并发协程执行结束。
sync.WaitGroup
常用的方法如下:
- (wg * WaitGroup) Add(delta int) 等待组的计数器 +1
- (wg * WaitGroup) Done() 等待组的计数器 -1
- (wg * WaitGroup) Wait() 当等待组计数器不等于 0 时阻塞直到变 0。
sync.WaitGroup(等待组)内部维持着一个计数器
,计数器的值可以通过方法调用实现计数器的增加和减少。当我们添加了 N 个并发任务进行工作时,就将等待组的计数器值增加 N。每个任务完成时,这个值减 1。同时,在另外一个 goroutine 中等待这个等待组的计数器值为 0 时,表示所有任务已经完成。
使用步骤如下:
- 协程开始时使用
wg.Add()
给等待组的计数 +1。 - 协程开始时使用
wg.Done()
给等待组的计数 -1。 wg.Done()
与wg.Add(-1)
完全等价。但如果将 wg 维护的计数更改成负数
,将产生panic(恐慌)
。- 协程调用了
wg.Wait()
时,- 此时 wg 维护的
计数为 0
,则此 wg.Wait() 此操作为空操作(noop)
; - 计数为
正整数
,此协程将进入阻塞状态
等待其他协程执行。当其它协程将此计数更改至 0 时,此协程 wg.Wait() 将返回。
- 此时 wg 维护的
channel (信道)
channel
就是 goroutine
之间的通信机制。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int。
Golang 提倡使用 通信的方法代替共享内存
,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。
声明通道时,需要指定被共享的数据的类型
。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
声明通道类型
声明通道时,需要指定被共享的数据的类型
,声明channel
语法如下:var 通道变量 chan 通道类型
- 通道类型:
通道内传输
的数据的类型 - chan 类型的空值是 nil,声明后需要配合
make
后才能使用。
创建通道
通道是引用类型,需要使用 make 进行创建,格式如下:
通道实例 := make(chan 数据类型)
数据类型:通道内被传输的数据的类型
。
通道实例:通过make
创建的通道句柄。
举个例子:
ch1 := make(chan int) // 创建一个整型类型的通道
ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式
type Equip struct{ /* 一些字段 */ }
ch2 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip
使用通道发送数据
通道创建后,就可以使用通道进行发送和接收操作。
通道发送数据的格式
通道的发送使用特殊的操作符<-
,将数据通过通道发送的格式为:
通道变量 <- 值
- 通道变量:通过make创建好的通道实例。
- 值:可以是变量、常量、表达式或者函数返回值等。
值的类型
必须与ch通道
的元素类型一致
。
使用 make 创建一个通道后,就可以使用<-向通道发送数据,代码如下:
// 创建一个空接口通道
ch := make(chan interface{})
// 将0放入通道中
ch <- 0
// 将hello字符串放入通道中
ch <- "hello"
发送将持续阻塞直到数据被接收
把数据往通道中发送时,如果接收方一直没接收
,那么发送操作将持续阻塞
。
Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,代码如下:
package main
func main() {
// 创建一个整型通道
ch := make(chan int)
// 尝试将0通过通道发送
ch <- 0
}
运行代码,报错:
fatal error: all goroutines are asleep - deadlock!
报错的意思是:运行时发现所有的 goroutine(包括main)都处于等待 goroutine。 也就是说所有 goroutine 中的 channel 并没有形成发送和接收对应的代码。
使用通道接收数据
通道接收同样使用<-
操作符,通道接收有如下特性:
-
通道的收发操作在不同的两个 goroutine 间进行。 由于通道的数据在没有接收方处理时,数据发送方会持续阻塞,因此通道的接收必定在另外一个 goroutine 中进行。
-
接收将持续阻塞直到发送方发送数据。 如果接收方接收时,通道中没有发送方发送数据,接收方也会发生阻塞,直到发送方发送数据为止。
-
每次接收一个元素。 通道一次只能接收一个数据元素。
通道的数据接收一共有以下 4 种写法:
阻塞接收数据
阻塞模式接收数据时,将接收变量作为<-操作符的左值,格式如下:
data := <-ch
执行该语句时将会阻塞,直到接收到数据并赋值给 data 变量。
非阻塞接收数据
使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下:
data, ok := <-ch
- data:表示接收到的数据。未接收到数据时,data 为通道类型的零值。
- ok:表示是否接收到数据。
非阻塞
的通道接收方法可能造成更高 CPU 占用
,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计数 channel 进行,可以参见后面的内容。
接收任意数据,忽略接收的数据
阻塞接收数据后,忽略从通道返回的数据,格式如下:
<-ch
执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。
使用通道做并发同步的写法,可以参考下面的例子:
package main
import (
"fmt"
)
func main() {
// 构建一个通道
ch := make(chan int)
// 开启一个并发匿名函数
go func() {
fmt.Println("start goroutine")
// 通过通道通知main的goroutine
ch <- 0
fmt.Println("exit goroutine")
}()
fmt.Println("wait goroutine")
// 等待匿名goroutine
<-ch
fmt.Println("all done")
}
// 输出如下:
// wait goroutine
// start goroutine
// exit goroutine
// all done