Go 并发编程深度指南
Go 语言以其内置的并发原语而闻名,通过 goroutine 和 channel 提供了一种高效、安全的并发编程模型。本文将全面解析 Go 的并发机制及其实际应用。
核心概念:Goroutines 和 Channels
1. Goroutines (协程)
Go 的轻量级线程实现,开销极小:
func main() {
// 启动一个协程
go func() {
fmt.Println("Hello from goroutine!")
}()
// 让主程序等待一会儿
time.Sleep(100 * time.Millisecond)
}
2. Channels (通道)
协程间通信的主要方式:
func main() {
// 创建无缓冲通道
ch := make(chan string)
go func() {
time.Sleep(500 * time.Millisecond)
ch <- "message"
}()
// 阻塞等待消息
msg := <-ch
fmt.Println("Received:", msg)
}
并发模式与最佳实践
1. WaitGroup 控制协程组
func processTasks(tasks []string) {
var wg sync.WaitGroup
for i, task := range tasks {
wg.Add(1) // 增加计数
go func(task string, id int) {
defer wg.Done() // 结束时减少计数
processTask(task, id)
}(task, i)
}
wg.Wait() // 等待所有完成
fmt.Println("All tasks completed")
}
2. Worker Pool 模式
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished job %d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动3个worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送9个任务
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// 接收结果
for a := 1; a <= 9; a++ {
<-results
}
}
3. Select 多路复用
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "One"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Two"
}()
// 同时等待两个通道
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case msg2 := <-ch2:
fmt.Println("Received", msg2)
}
}
}
4. Context 控制协程生命周期
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker canceled")
return
case <-time.After(500 * time.Millisecond):
fmt.Println("Working...")
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
// 运行3秒后取消
time.Sleep(3 * time.Second)
cancel()
// 给worker时间响应取消
time.Sleep(500 * time.Millisecond)
}
5. Mutex 保护共享资源
type SafeCounter struct {
mu sync.Mutex
v int
}
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.v++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.v
}
func main() {
counter := SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc()
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value())
}
高级并发模式
1. 扇入/扇出 (Fan-in/Fan-out)
// 生产者
func producer(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
// 消费者
func consumer(done <-chan struct{}, in <-chan int, id int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
// 模拟处理
result := n * n
select {
case out <- result:
case <-done:
return
}
}
}()
return out
}
// 扇入多个通道
func fanIn(done <-chan struct{}, chs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 定义输出函数
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(chs))
for _, c := range chs {
go output(c)
}
// 启动goroutine等待所有完成
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done)
// 创建输入通道
in := producer(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// 启动3个消费者
c1 := consumer(done, in, 1)
c2 := consumer(done, in, 2)
c3 := consumer(done, in, 3)
// 合并结果
for result := range fanIn(done, c1, c2, c3) {
fmt.Println("Result:", result)
}
}
2. Future/Promise 模式
func futureWork(input int) <-chan int {
result := make(chan int)
go func() {
// 模拟耗时操作
time.Sleep(500 * time.Millisecond)
result <- input * 2
close(result)
}()
return result
}
func main() {
f1 := futureWork(5)
f2 := futureWork(10)
// 并行执行后获取结果
r1 := <-f1
r2 := <-f2
fmt.Println("Results:", r1, r2) // 10, 20
}
性能优化与陷阱规避
1. 限制并发数
func controlledWork(workers int) {
sem := make(chan struct{}, workers)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem <- struct{}{} // 获取信号量
defer func() { <-sem }() // 释放信号量
// 执行工作
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}(i)
}
wg.Wait()
}
2. 通道选择与超时
func fetchData(url string, timeout time.Duration) (string, error) {
ch := make(chan string, 1)
go func() {
// 模拟网络请求
time.Sleep(500 * time.Millisecond)
ch <- "Response from " + url
}()
select {
case res := <-ch:
return res, nil
case <-time.After(timeout):
return "", errors.New("request timed out")
}
}
3. 避免竞态条件
// 坏: 共享变量无保护
var count int
for i := 0; i < 100; i++ {
go func() {
count++ // 数据竞争!
}()
}
// 好: 使用互斥锁
var (
mu sync.Mutex
count int
)
for i := 0; i < 100; i++ {
go func() {
mu.Lock()
defer mu.Unlock()
count++
}()
}
// 更好: 使用通道通信
ch := make(chan struct{})
go func() {
count := 0
for range ch {
count++
}
}()
for i := 0; i < 100; i++ {
ch <- struct{}{}
}
并发性能分析工具
-
Race Detector:
go run -race yourprogram.go
-
pprof:
import _ "net/http/pprof" func main() { go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) }() // 程序主体... }
然后使用
go tool pprof http://localhost:6060/debug/pprof/profile
进行分析 -
Benchmark:
func BenchmarkWork(b *testing.B) { for i := 0; i < b.N; i++ { doWork() } }
Go 并发设计哲学
- 不要通过共享内存来通信,而应通过通信来共享内存
- 并发不是并行 - 并发是设计结构,并行是执行方式
- 利用组合而不是继承 - 通过组合小的并发原语构建复杂系统
- 错误处理也是控制流 - 将错误作为值传递,通过通道处理
Go 的并发模型提供了强大而简单的工具集,使开发者能够构建高效、可伸缩的并发系统。通过理解 goroutine、channel 和各种同步原语的使用方法,开发者可以避免许多并发编程的常见陷阱,创建出更加稳健的系统。