文章目录
- 前言
 - 协程
 - goroutine 调度
 - 使用 goroutine
 
- 通道
 - 无缓冲通道
 - 有缓冲通道
 - 单向通道
 
- select 多路复用
 - sync
 - sync.WaitGroup
 - sync.Mutex
 - sync.RWMutex
 - sync.Once
 - sync.Map
 
项目代码地址:05-GoroutineChannelSync
前言
Go 1.22 版本于不久前推出,更新的新特性可以参考官文。从此篇章开始,后续 go 版本更为 1.22.0 及以上,自行官网下载。
协程
常见的并发模型
- 线程与锁模型
 - Actor 模型
 - CSP 模型
 - Fork 与 Join 模型
 
Go 语言天生支持并发,主要通过基于通信顺序过程(Communicating Sequential Processes, CSP)的 goroutine 和通道 channel 实现,同时也支持传统的多线程共享内存的并发方式。
goroutine 会以一个很小的栈开始其生命周期,一般只需要 2 KB。goroutine 由 Go 运行时(runtime)调度,Go 运行时会智能地将 m 个 goroutine 合理的分配给 n 个操作系统线程,实现类似 m:n 的调度机制,不再需要开发者在代码层面维护线程池。
goroutine 调度
操作系统线程的调度:操作系统线程在被内核调度时挂起当前执行的线程,并将它的寄存器内容保存到内存中,然后选出下一次要执行的线程,并从内存中恢复该线程的寄存器信息,恢复现场并执行该线程,这样就完成一次完整的线程上下文切换。
goroutine 调度:区别于操作系统线程的调度,goroutine 调度在 Go 语言运行时层面实现,完全由 Go 语言本身实现,按照一定规则将所有的 goroutine 调度到操作系统线程上执行。
goroutine 调度器采用 GPM 调度模型,如下所示:

-  
G:表示 goroutine,每执行一次go f()就创建一个 G,包含要执行的函数和上下文信息。
 -  
全局队列(Global Queue):存放等待运行的 G。
 -  
P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。GOMAXPROCS 默认 CPU 核心数,指定需要使用多少个操作系统线程来同时执行代码。
 -  
P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。
 -  
M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
 -  
Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。
 
参考:https://www.liwenzhou.com/posts/Go/concurrence/
使用 goroutine
启动 goroutine 只需要在函数前加 go 关键字:
func f(msg string) {
	for i := 0; i < 3; i++ {
		fmt.Println(msg, ":", i)
	}
}
func functino01() {
	go f("goroutine")
	go func(msg string) {
		fmt.Println(msg)
	}("going")
	time.Sleep(time.Second)
	fmt.Println("done")
}
 
going
goroutine : 0
goroutine : 1
goroutine : 2
done
 
使用 time.Sleep 等待协程 goroutine 的运行不优雅,同时也不够精确,后续会采用 sync 包提供的常用并发原语,对协程的运行状态进行控制。
在 go 1.22.0 版本后,如下使用可正常在协程闭包函数中捕获外部的变量,而不是每个 loop 仅一份变量了。
参考:https://zhuanlan.zhihu.com/p/674158675
func function05() {
	for i := 0; i < 5; i++ {
		go func() {
			fmt.Println(i)
		}() // 正常输出 0~4 中的数字,而不是全是 4
	}
	time.Sleep(time.Second)
}
 
通道
通道 channel 是一种特殊类型,遵循先入先出(FIFO)的特性,用于 goroutine 之间的同步、通信。
声明 channel 语法如下:
chan T 		// 双向通道
chan <- T  	// 只能发送的通道
<- chan T	// 只能接收的通道
 
channel 是一个引用类型,在被初始化前值为 nil,需要使用 make 函数进行初始化。缓冲区大小可选:
- 有缓冲通道:
make(chan T, capacity int) - 无缓冲通道:
make(chan T)、make(chan T, 0) 
通道共有三种操作,发送、接受、关闭:
- 定义通道
 
ch := make(chan int)
 
- 发送一个值到通道中
 
ch <- 10
 
- 从通道中接收值
 
v := <- ch 		// 从 ch 接收值赋给 v
v, ok := <- ch 	// 多返回值,ok 表示通道是否被关闭
<- ch			// 从 ch 接收值,忽略结果
 
- 关闭通道
 
close(ch)
 
tips
- 对一个关闭的通道发送值会导致 panic
 - 对一个关闭的通道一直获取值会直到通道为空
 - 重复关闭通道会 panic
 - 通道值可以被垃圾回收
 - 对一个关闭并且没值的通道接收值,会获取对应类型零值
 
无缓冲通道
又称阻塞通道,同步通道。
无缓冲通道必须至少有一个接收方才能发送成功,即发送操作会阻塞,直到另一个 goroutine 在该通道上接收。相反,接收操作先执行,也会阻塞至有 goroutine 往通道发送数据。
发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输。
等待一秒后,主程才能获取到 ch 中的数据
func function02() {
	ch := make(chan int, 0)
	go func() {
		time.Sleep(time.Second)
		ch <- 1
	}()
	v := <-ch
	fmt.Println(v)
}
 
等待一秒后,协程中才能获取到 ch 中的数据
func function03() {
	ch := make(chan int)
	go func() {
		v := <-ch
		fmt.Println(v)
	}()
	time.Sleep(time.Second)
	ch <- 1
	time.Sleep(time.Second)
}
 
有缓冲通道
又称异步通道
有缓冲通道可以通过 cap 获取通道容量,len 获取通道内元素数量。如果通道元素数量达到上限,那么继续往通道发送数据也会被阻塞,直至有 goroutine 从通道获取数据。
通常选择使用 for range 循环从通道中接收值,当通道被关闭后,通道内所有值被接收完毕后会自动退出循环。
func function04() {
	ch := make(chan int, 2)
	fmt.Println(len(ch), cap(ch)) // 0 2
	ch <- 1
	ch <- 2
	go func() {
		for v := range ch {
			fmt.Println(v)
		}
	}() // 1 2 3
	ch <- 3
	time.Sleep(time.Second)
}
 
- 多返回值模式
 
基本格式:value, ok := <- ch
ok :如果为 false 表示 value 为无效值(通道关闭后的默认零值);如果为 true 表示 value 为通道中的实际数据值。
func function06() {
	ch := make(chan int, 1)
	ch <- 1
	close(ch)
	go func() {
		for {
			if v, ok := <-ch; ok {
				fmt.Println(v)
			} else {
				break
			}
		}
	}()
	time.Sleep(time.Second)
}
 
单向通道
通常会在函数参数中限制通道只能用于接收或发送。控制通道在函数中只读或只写,提升程序的类型安全。
// Producer 生产者
func Producer() <-chan int {
	ch := make(chan int, 1)
	go func() {
		for i := 0; i < 3; i++ {
			ch <- i
		}
		close(ch) // 任务完成关闭通道
	}()
	return ch
}
// Consumer 消费者
func Consumer(ch <-chan int) int {
	sum := 0
	for v := range ch {
		sum += v
	}
	return sum
}
func function07() {
	ch := Producer()
	sum := Consumer(ch)
	fmt.Println(sum) // 3
}
 
在函数传参及赋值过程中,全向通道可以转为单向通道,但单向通道不可转为全向通道。
func function08() {
	ch := make(chan int, 1)
	go func(ch chan<- int) {
		for i := 0; i < 2; i++ {
			ch <- i
		}
		close(ch)
	}(ch)
	for v := range ch {
		fmt.Println(v)
	} // 0 1
}
 
Go 语言采用的并发模型是 CSP,提倡通过通信实现内存共享,而不是通过共享内存实现通信。
CSP 模型由并发执行的实体所组成,实体之间通过发送消息进行通信。
Go 通过 channel 实现 CSP 通信模型,主要用于 goroutine 之间的消息传递和事件通知。
select 多路复用
在从多个通道获取数据的场景下, 需要使用 select 选择器,使用方式类似于 switch 语句,有一系列的 case 分支和一个默认分支。
基本格式:
select {
case <- ch1:
	...
case data := <- ch2:
	...
case ch3 <- 3:
	...
default:
	...
}
 
select 会一直等待,直到其中某个 case 的通信操作完成,执行该 case 语句。
- 可处理一个或多个 channel 的接收和发送
 - 如果多个 case 同时满足,select 随机选择一个执行
 
func function09() {
	now := time.Now()
	ch1 := make(chan string)
	ch2 := make(chan string)
	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "one"
	}()
	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "two"
	}()
	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println(msg1)
		case msg2 := <-ch2:
			fmt.Println(msg2)
		}
	} // one two
	fmt.Println(time.Since(now)) // 2.0003655s
}
 
sync
在上述示例中,使用了大量的 time.Sleep 等待 goroutine 的结束。但还有更好的方式,使用内置的 sync 包管理协程的运行状态。
sync.WaitGroup
使用 wait group 等待多个协程完成,如果 WaitGroup 计数器恢复为 0,即所有协程的工作都完成:
var (
	x  int64
	wg sync.WaitGroup
)
func function10() {
	add := func() {
		defer wg.Done()
		for i := 0; i < 5000; i++ {
			x = x + 1
		}
	}
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}
 
使用 go run -race main.go 可查看代码是否存在竞态问题,上述代码存在两个 goroutine 操作同一个资源,输出结果不定。
| 方法 | 作用 | 
|---|---|
WaitGroup.Add(delta) | 计数器值 +delta,建议在 goroutine 外部累加计数器 | 
WaitGroup.Done() | 计数器值 -1 | 
WaitGroup.Wait() | 阻塞代码,直到计数器值减为 0 | 
注意:WaitGroup 对象不是一个引用类型,在通过函数传值的时候需要使用地址。
sync.Mutex
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。
| 方法 | 作用 | 
|---|---|
Mutex.Lock() | 获取互斥锁 | 
Mutex.Unlock() | 释放互斥锁 | 
使用互斥锁对代码修改如下:
var (
	x   int64
	wg  sync.WaitGroup
	mtx sync.Mutex
)
func function11() {
	add := func() {
		defer wg.Done()
		for i := 0; i < 5000; i++ {
			mtx.Lock()	// 修改数据前,加锁
			x = x + 1
			mtx.Unlock() // 修改完数据后,释放锁
		}
	}
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x) // 10000
}
 
sync.RWMutex
读写互斥锁,某些场景中读操作较为频繁,不涉及对数据的修改时,读写锁可能是更好的选择。
| 方法 | 作用 | 
|---|---|
RWMutex.Lock() | 获取写锁 | 
RWMutex.Unlock() | 释放写锁 | 
RWMutex.RLock() | 获取读锁 | 
RWMutex.RUnlock() | 释放读锁 | 
读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。
sync.Once
在高并发场景下,可以使用 sync.Once,保证操作只执行一次。当且仅当第一次访问某个变量时,进行初始化。变量初始化过程中,所有读都被阻塞,直到初始化完成。
sync.Once 其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会被执行多次。
sync.Once 仅提供了一个方法 Do,参数 f 是对象初始化函数。
func (o *Once) Do(f func())
单例模式:
type Singleton struct{}
var (
	instance *Singleton
	once     sync.Once
	wg       sync.WaitGroup
)
func GetInstance() *Singleton {
	once.Do(func() {
		instance = &Singleton{}
		fmt.Println("Get Instance")
	})
	return instance
}
func function12() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			_ = GetInstance()
		}()
	} // Get Instance
	wg.Wait()
}
 
程序只会输出一次 Get Instance,说明 sync.Once 是线程安全的,支持并发,仅会执行一次初始化数据的函数。
sync.Map
Go 内置的 map 不是并发安全的,下述代码多个 goroutine 对 map 操作会出现竞态问题,报错不能正常运行。
var (
	mp = make(map[string]interface{})
	wg sync.WaitGroup
)
func function13() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			key := strconv.Itoa(i)
			mp[key] = i
			fmt.Println(key, mp[key])
		}()
	}
	wg.Wait()
}
 
sync.Map 是并发安全版 map,不过操作数据不再是直接通过 [] 获取插入数据,而需要使用其提供的方法。
| 方法 | 作用 | 
|---|---|
Map.Store(key, value interface{}) | 存储 key-value 数据 | 
Map.Load(key interface{}) (value interface{}, ok bool) | 查询 key 对应的 value | 
Map.LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) | 查询 key 对应的 value,如果不存在则存储 key-value 数据 | 
Map.LoadAndDelete(key interface{}) (value interface{}, loaded bool) | 查询并删除 key | 
Map.Delete(key interface{}) | 删除 key | 
Map.Range(f func(key, value interface{}) bool) | 对 map 中的每个 key-value 依次调用 f | 
使用 sync.Map 修改上述代码,即可正确运行。
func function14() {
	m := sync.Map{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			key := strconv.Itoa(i)
			m.Store(key, i)
			v, ok := m.Load(key)
			if ok {
				fmt.Println(key, v)
			}
		}()
	}
	wg.Wait()
}
 
LoadOrStore、LoadAndDelete 示例代码:
// LoadOrStore、LoadAndDelete
func function15() {
	m := sync.Map{}
	//m.Store("cauchy", 19)
	v, ok := m.LoadOrStore("cauchy", 20)
	fmt.Println(v, ok) // 注释: 20 false;没注释: 19 true
	v, ok = m.Load("cauchy")
	fmt.Println(v, ok) // 注释: 20 true;没注释: 19 true
	v, ok = m.LoadAndDelete("cauchy")
	fmt.Println(v, ok) // 注释: 20 true;没注释: 19 true
	v, ok = m.Load("cauchy")
	fmt.Println(v, ok) // nil false
}
 
Range 示例代码:
Map.Range 可无序遍历 sync.Map 中的所有 key-value 键值对,如果返回 false 则终止迭代。
func function16() {
	m := sync.Map{}
	m.Store(3, 3)
	m.Store(2, 2)
	m.Store(1, 1)
	cnt := 0
	m.Range(func(key, value any) bool {
		cnt++
		fmt.Println(key, value)
		return true
	})
	fmt.Println(cnt) // 3
}
                










![LeetCode 刷题 [C++] 第141题.环形链表](https://img-blog.csdnimg.cn/direct/56a7f28ba3da4cc58232b5e1b1fc7968.png)







