1 协程基础
1.1 协程定义(Goroutine)
- 概念:Go 语言特有的轻量级线程,由 Go 运行时(runtime)管理,相比系统线程(Thread),创建和销毁成本极低,占用内存小(初始 2KB)。协程是 Go 程序中最基本的并发执行单元。
-
创建方式:使用
go
关键字启动一个协程func main() { // 匿名函数直接启动协程 go func() { fmt.Println("Hello from goroutine!") }() // 调用已定义函数启动协程 go func1() go func2() time.Sleep(time.Second) // 等待协程执行,否则主协程退出导致所有协程终止 fmt.Println("主协程退出") }
1.2 协程调度模型GMP
Go 调度器采用 Goroutine-Machine-Processor (GMP) 模型,核心组件包括:
- G (Goroutine):协程的抽象,包含执行栈、程序计数器等信息。
- M (Machine): 对应操作系统线程,实际执行代码的实体。
- P (Processor):逻辑处理器,持有运行队列(Local Queue)和 G 上下文,必须绑定 M 才能执行 G。
M 是 “执行任务的实体”,是唯一能运行 Go 代码的载体。G(任务)本身只是一段代码逻辑,必须依赖 M(操作系统线程)才能在 CPU 上执行
M和P是绑定关系,必须成对出现
1.2.1 协程创建
- 当调用
go func()
时,创建一个新的 G 对象,放入当前 P 的 Local Queue。 - 若 Local Queue 已满(默认 256 个 G),将一半 G 转移到全局队列(Global Queue)。
1.2.2 协程执行
- M 从绑定的 P 的 Local Queue 获取 G 执行。
- 若 Local Queue 为空,从 Global Queue 批量获取 G(通常为 P 的 GOMAXPROCS/2)。
- 若 Global Queue 也为空,从其他 P 的 Local Queue 窃取(Work Stealing) 一半 G。
1.2.3 协程阻塞 / 唤醒
- 当 G 执行系统调用(如 I/O)时,M 与 P 解绑,P 可被其他 M 接管继续执行队列中的 G。
如果 M 因系统调用被阻塞时,P 继续绑定 M,会导致以下问题:
- P 无法工作:P 的本地队列中可能有大量就绪的 G,但由于 M 被阻塞,这些 G 无法执行。
- CPU 核心浪费:如果 P 对应一个 CPU 核心,该核心将处于闲置状态,即使还有其他任务可执行。
- 因此,当 G 执行系统调用时,调度器会 解绑 M 和 P,允许 P 继续工作,避免 CPU 资源浪费
- 系统调用返回后,G 重新加入某个 P 的队列等待执行。
2 并发模式
2.1 共享内存并发
多个协程通过共享变量访问数据,需使用同步原语(如sync.Mutex
、sync.RWMutex
)保护临界区
var (
counter int
mu sync.Mutex
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
increment()
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 输出1000,无竞争
}
2.2 CSP 并发(通过通道通信)
使用channel
实现协程间通信和同步,遵循 “不要通过共享内存来通信,而要通过通信来共享内存” 原则
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for num := range ch {
fmt.Println("Received:", num)
}
}
func main() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}
2.3 并发任务控制
// 普通的协程创建方法:
go func() {
// your code1
}()
go func() {
// your code2
}()
// go on
这段 Go 代码的执行顺序如下:
-
启动 goroutine 1:主协程创建并启动第一个匿名函数(
// your code1
),该函数在后台异步执行。 -
启动 goroutine 2:主协程紧接着创建并启动第二个匿名函数(
// your code2
),同样在后台异步执行。 -
主协程继续执行:主协程不会等待这两个 goroutine 完成,而是立即继续执行
// go on
之后的代码。 -
并行执行 goroutine:
// your code1
和// your code2
的执行顺序取决于调度器,可能并行或交替执行,但它们的完成顺序不确定。由于主协程未等待它们,若主协程提前结束(例如程序退出),这两个 goroutine 可能被强制终止。
2.3.1 sync.WaitGroup
wg.Wait()会阻塞直到2个协程执行完后
go func() {
// func1
wg.Done()
}()
go func() {
// func2
wg.Done()
}()
wg.Wait()
// go on
这段 Go 代码的执行顺序如下:
-
启动 goroutine 1:主协程创建并启动第一个匿名函数(
func1
),该函数在后台异步执行。 -
启动 goroutine 2:主协程紧接着创建并启动第二个匿名函数(
func2
),同样在后台异步执行。 -
主协程阻塞:主协程执行
wg.Wait()
,进入阻塞状态,等待所有被等待的 goroutine 完成。 -
并行执行 goroutine:
func1
和func2
的执行顺序取决于调度器,可能并行或交替执行,但它们的完成顺序不确定。每个 goroutine 在完成任务后调用wg.Done()
通知等待组。 -
恢复主协程:当所有被等待的 goroutine(即
func1
和func2
)都调用了wg.Done()
后,wg.Wait()
返回,主协程继续执行后续代码(// go on
)。
主协程 | goroutine 1 | goroutine 2
---------------------------------------------------------------
wg.Add(2) | |
启动func1 | 开始执行func1 |
启动func2 | | 开始执行func2
wg.Wait()阻塞 | ... | ...
| 执行完毕 |
| wg.Done() |
| | 执行完毕
| | wg.Done()
wg.Wait()返回 | |
继续执行后续代码
2.3.2 errgroup.Group
var g errgroup.Group
g.Go(func() error {
// 任务1:可能返回错误
return nil
})
g.Go(func() error {
// 任务2:可能返回错误
return errors.New("task failed")
})
if err := g.Wait(); err != nil {
// 处理首个错误(如任务2失败)
}
执行顺序:
-
主协程启动两个 goroutine 并行执行
-
若其中一个 goroutine 返回非 nil 错误:
-
自动调用内置的
context.CancelFunc
-
向其他 goroutine 发送取消信号(通过 context)
-
g.Wait()
立即返回首个错误
-
-
所有 goroutine(包括未出错的)需主动检查 context 状态并提前退出
2.3.3 对比
特性 | errgroup.Group | sync.WaitGroup |
---|---|---|
错误处理 | 自动捕获首个非 nil 错误并终止所有 goroutine | 不处理错误 |
执行控制 | 首个错误发生后,其他 goroutine 会被 CancelFunc 终止 | 所有 goroutine 独立运行至完成 |
结果聚合 | 可返回首个错误,用于统一错误处理 | 无内置错误传递机制 |
取消机制 | 支持通过 context 传播取消信号 | 无内置取消机制 |
3. 并发panic处理
协程中发生 panic 若未被捕获,仅会导致该协程崩溃,不会影响其他协程和主程序,但可能导致资源泄漏
3.1 普通goroutine的panic处理
对于普通的goroutine,可以在协程函数内部使用defer和recover组合来捕获panic。defer语句会将函数推迟到外层函数返回之前执行,而recover函数用于捕获panic,它只能在defer修饰的函数中有效
func worker() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in worker:", r)
}
}()
// 可能触发panic的代码
var data map[string]int
data["key"] = 1 // 触发panic: assignment to entry in nil map
}
func main() {
go worker()
time.Sleep(time.Second)
fmt.Println("Main continues")
}
3.2 使用 sync.WaitGroup 时的 panic 处理
sync.WaitGroup常用于等待一组goroutine完成任务。在这种场景下,每个goroutine内部仍需使用defer和recover捕获panic,并且可以通过额外的机制将panic信息传递给主协程。
import (
"fmt"
"sync"
)
type Result struct {
Err error
Data interface{}
}
func worker(id int, wg *sync.WaitGroup, resultChan chan<- Result) {
defer func() {
if r := recover(); r != nil {
resultChan <- Result{Err: fmt.Errorf("panic in worker %d: %v", id, r)}
}
}()
// 模拟可能触发panic的任务
if id == 2 {
panic("simulated panic")
}
resultChan <- Result{Data: fmt.Sprintf("Worker %d finished", id)}
wg.Done()
}
func main() {
var wg sync.WaitGroup
resultChan := make(chan Result)
numWorkers := 3
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &wg, resultChan)
}
go func() {
wg.Wait()
close(resultChan)
}()
for result := range resultChan {
if result.Err != nil {
fmt.Println(result.Err)
} else {
fmt.Println(result.Data)
}
}
}
worker函数通过defer和recover捕获panic,并将错误信息封装成Result结构体发送到resultChan通道。主协程从通道中接收结果,判断是否存在错误并进行相应处理,确保即使有goroutine发生panic,也能及时获取信息并继续执行后续逻辑。
3.3 使用 errgroup.Group 时的 panic 处理
errgroup.Group可以方便地并行执行多个任务,并在其中一个任务出错时快速返回错误。然而,它只能处理函数返回的错误,无法自动捕获goroutine内部的panic。因此,需要手动在每个任务函数中添加panic捕获逻辑,并将panic转换为错误返回给errgroup.Group。
3.3.1 方法一:手动封装panic捕获
import (
"fmt"
"golang.org/x/sync/errgroup"
)
func safeGo(g *errgroup.Group, fn func() error) {
g.Go(func() error {
defer func() {
if r := recover(); r != nil {
return fmt.Errorf("panic occurred: %v", r)
}
}()
return fn()
})
}
func main() {
var g errgroup.Group
safeGo(&g, func() error {
// 可能触发panic的任务
panic("unexpected error")
return nil
})
if err := g.Wait(); err != nil {
fmt.Println("Error:", err) // 输出 panic occurred: unexpected error
}
}
3.3.2 封装增强版errgroup
import (
"fmt"
"golang.org/x/sync/errgroup"
"sync"
)
type SafeGroup struct {
g errgroup.Group
mu sync.Mutex
panics []interface{}
}
func (sg *SafeGroup) Go(fn func() error) {
sg.g.Go(func() error {
defer func() {
if r := recover(); r != nil {
sg.mu.Lock()
sg.panics = append(sg.panics, r)
sg.mu.Unlock()
}
}()
return fn()
})
}
func (sg *SafeGroup) Wait() error {
if err := sg.g.Wait(); err != nil {
return err
}
if len(sg.panics) > 0 {
return fmt.Errorf("panics occurred: %v", sg.panics)
}
return nil
}
func main() {
var sg SafeGroup
sg.Go(func() error {
panic("panic in goroutine")
return nil
})
if err := sg.Wait(); err != nil {
fmt.Println("Error:", err) // 输出: panics occurred: [panic in goroutine]
}
}
这两种方案都能有效地在errgroup.Group中处理panic,方案一通过简单的函数封装,在每个任务中添加panic捕获;方案二则通过自定义结构体,将panic信息集中管理,在Wait方法中统一返回错误,方便在复杂场景下对panic进行更灵活的处理。