并发编程

Golang 提供 sync、channel 两种实现方式支持协程(goroutine)并发 & sync 讲解

Posted by 锐玩道 on May 29, 2021

如果❤️我的文章有帮助,欢迎点赞、关注。这是对我继续技术创作最大的鼓励。更多往期文章在我的个人博客

go 并发编程

Golang 提供 syncchannel 两种实现方式支持协程(goroutine)并发。

例如我们举个并发下载资源的例子,实现两种并发编程:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main()  {
	// ============== sync ==============
	startTime1 := time.Now()

	testGoroutine()

	endTime1 := time.Now()
    // 可以看到串行需要 3s 的下载操作,并发后,只需要 1s
	fmt.Printf("======> Done! use time: %f",(endTime1.Sub(startTime1).Seconds()))
	fmt.Println()

	// ============== channel  ==============
	startTime2 := time.Now()

	testChannel()

	endTime2 := time.Now()
	fmt.Printf("======> Done! use time: %f",(endTime2.Sub(startTime2).Seconds()))
	fmt.Println()
}

// 使用 sync.WaitGroup 多个并发协程之间不能通信, 等待所有并发协程执行结束
var wg sync.WaitGroup
func testGoroutine() {
	for i:=0; i<3; i++ {
		wg.Add(1) // wg.Add() 为 wg 添加一个计数; wg.Done() 减去一个计数。
		go downloadV1("a.com/time_"+string(i+'0'))	// 启动新的协程并发执行 download 函数。
	}

	wg.Wait()	// wg.Wait():等待所有的协程执行结束。
}

// 使用 channel 信道,可以在协程之间传递消息。等待并发协程返回消息。
var ch = make(chan string, 10) // 创建大小 10 的缓存通道
func testChannel() {
	for i := 0; i < 3; i++ {
		go downloadV2("a.com/" + string(i+'0'))
	}
	for i := 0; i < 3; i++ {
		msg := <-ch // 等待信道返回消息。
		fmt.Println("finish", msg)
	}
}

// ================= 工具 V  =================
func downloadV1(url string) {
	fmt.Println("start to download", url)
	time.Sleep(time.Second) // 模拟耗时

	wg.Done()
}

func downloadV2(url string) {
	fmt.Println("start to download", url)
	time.Sleep(time.Second)
	ch <- url // 将 url 发送给信道
}

下面我们就来详细说说 两种并发实现方式

sync.WaitGroup(等待组)

sync.WaitGroup 类型中,每个WaitGroup都在内部维护着一个计数(初始值为0)。 如果多个并发协程间不需要通信,那么非常适合使用 sync.WaitGroup,等待所有并发协程执行结束。 sync.WaitGroup 常用的方法如下:

  • (wg * WaitGroup) Add(delta int) 等待组的计数器 +1
  • (wg * WaitGroup) Done() 等待组的计数器 -1
  • (wg * WaitGroup) Wait() 当等待组计数器不等于 0 时阻塞直到变 0。

sync.WaitGroup(等待组)内部维持着一个计数器,计数器的值可以通过方法调用实现计数器的增加和减少。当我们添加了 N 个并发任务进行工作时,就将等待组的计数器值增加 N。每个任务完成时,这个值减 1。同时,在另外一个 goroutine 中等待这个等待组的计数器值为 0 时,表示所有任务已经完成。 使用步骤如下:

  • 协程开始时使用 wg.Add() 给等待组的计数 +1。
  • 协程开始时使用 wg.Done() 给等待组的计数 -1。
  • wg.Done()wg.Add(-1) 完全等价。但如果将 wg 维护的计数更改成负数,将产生panic(恐慌)
  • 协程调用了 wg.Wait() 时,
    • 此时 wg 维护的计数为 0,则此 wg.Wait() 此操作为空操作(noop)
    • 计数为正整数,此协程将进入阻塞状态等待其他协程执行。当其它协程将此计数更改至 0 时,此协程 wg.Wait() 将返回。

channel (信道)

channel 就是 goroutine 之间的通信机制。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int。

Golang 提倡使用 通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。

声明通道时,需要指定被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。

声明通道类型

声明通道时,需要指定被共享的数据的类型,声明channel语法如下:var 通道变量 chan 通道类型

  • 通道类型:通道内传输的数据的类型
  • chan 类型的空值是 nil,声明后需要配合 make 后才能使用。

创建通道

通道是引用类型,需要使用 make 进行创建,格式如下: 通道实例 := make(chan 数据类型)

数据类型:通道内被传输的数据的类型。 通道实例:通过make创建的通道句柄。

举个例子:

ch1 := make(chan int)                 // 创建一个整型类型的通道
ch2 := make(chan interface{})         // 创建一个空接口类型的通道, 可以存放任意格式

type Equip struct{ /* 一些字段 */ }
ch2 := make(chan *Equip)             // 创建Equip指针类型的通道, 可以存放*Equip

使用通道发送数据

通道创建后,就可以使用通道进行发送和接收操作。

通道发送数据的格式

通道的发送使用特殊的操作符<-,将数据通过通道发送的格式为: 通道变量 <- 值

  • 通道变量:通过make创建好的通道实例。
  • 值:可以是变量、常量、表达式或者函数返回值等。值的类型必须与ch通道的元素类型一致

使用 make 创建一个通道后,就可以使用<-向通道发送数据,代码如下:

// 创建一个空接口通道
ch := make(chan interface{})
// 将0放入通道中
ch <- 0
// 将hello字符串放入通道中
ch <- "hello"

发送将持续阻塞直到数据被接收

把数据往通道中发送时,如果接收方一直没接收,那么发送操作将持续阻塞。 Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,代码如下:

package main
func main() {
    // 创建一个整型通道
    ch := make(chan int)
    // 尝试将0通过通道发送
    ch <- 0
}

运行代码,报错:

fatal error: all goroutines are asleep - deadlock!

报错的意思是:运行时发现所有的 goroutine(包括main)都处于等待 goroutine。 也就是说所有 goroutine 中的 channel 并没有形成发送和接收对应的代码。

使用通道接收数据

通道接收同样使用<-操作符,通道接收有如下特性:

  • 通道的收发操作在不同的两个 goroutine 间进行。 由于通道的数据在没有接收方处理时,数据发送方会持续阻塞,因此通道的接收必定在另外一个 goroutine 中进行。

  • 接收将持续阻塞直到发送方发送数据。 如果接收方接收时,通道中没有发送方发送数据,接收方也会发生阻塞,直到发送方发送数据为止。

  • 每次接收一个元素。 通道一次只能接收一个数据元素。

通道的数据接收一共有以下 4 种写法:

阻塞接收数据

阻塞模式接收数据时,将接收变量作为<-操作符的左值,格式如下: data := <-ch

执行该语句时将会阻塞,直到接收到数据并赋值给 data 变量。

非阻塞接收数据

使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下: data, ok := <-ch

  • data:表示接收到的数据。未接收到数据时,data 为通道类型的零值。
  • ok:表示是否接收到数据。

非阻塞的通道接收方法可能造成更高 CPU 占用,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计数 channel 进行,可以参见后面的内容。

接收任意数据,忽略接收的数据

阻塞接收数据后,忽略从通道返回的数据,格式如下: <-ch

执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。

使用通道做并发同步的写法,可以参考下面的例子:

package main
import (
    "fmt"
)
func main() {
    // 构建一个通道
    ch := make(chan int)
    // 开启一个并发匿名函数
    go func() {
        fmt.Println("start goroutine")
        // 通过通道通知main的goroutine
        ch <- 0
        fmt.Println("exit goroutine")
    }()
    fmt.Println("wait goroutine")
    // 等待匿名goroutine
    <-ch
    fmt.Println("all done")
}

// 输出如下:
// wait goroutine
// start goroutine
// exit goroutine
// all done