gopool
gopool
是字节跳动开源节流的gopkg
包中协程池的一个实现。
关键结构
协程池:
type pool struct {
// The name of the pool
name string
// capacity of the pool, the maximum number of goroutines that are actually working
// 协程池的最大容量
cap int32
// Configuration information
config *Config
// linked list of tasks
// 任务链表
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// Record the number of running workers
// 运行中的协程数
workerCount int32
// This method will be called when the worker panic
// 出现 panic 时调用、
panicHandler func(context.Context, interface{})
}
任务:
type task struct {
ctx context.Context
f func()
next *task
}
worker:
type worker struct {
pool *pool
}
源码分析
先说一下 gopool
的工作流程:
- 通过 Go 或者 CtxGo 方法调用
- 从 taskPool 中取出一个 t
- 如果当前的积压task达到阈值且worker(工作协程)的数量未达到上限,则新建一个worker。
pool.cap
最大工作协程与实际运行的最大协程可能会存在误差。因为新建worker这块不是原子操作:
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
// 工作协程加1
p.incWorkerCount()
w := workerPool.Get().(*worker)
w.pool = p
w.run()
}
worker 的最大数量不会超过pool.cap
。worker run 流程比较简单:
- 循环的从 pool 中取出 task 执行
为了方便查看源码,我把相关代码都粘到了下面的,详细流程如下:
var workerPool sync.Pool
var taskPool sync.Pool
// 初始化 taskPool
func init() {
taskPool.New = newTask
}
func (p *pool) Go(f func()) {
p.CtxGo(context.Background(), f)
}
func (p *pool) CtxGo(ctx context.Context, f func()) {
// 从 taskPool 中取 task,避免频繁创建销毁
t := taskPool.Get().(*task)
t.ctx = ctx
// 赋值执行函数
t.f = f
// 将 t 添加到任务链表里,加锁保证并发安全
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
// 任务链表数量原子加 1
atomic.AddInt32(&p.taskCount, 1)
// The following two conditions are met:
// 1. the number of tasks is greater than the threshold.
// 2. The current number of workers is less than the upper limit p.cap.
// or there are currently no workers.
// 满足以下两个条件:
// 1.任务数大于等于设置的阈值(默认为1)
// 2.当前的协程数低于上限,或者目前没有工人
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
// 工作协程加1
p.incWorkerCount()
w := workerPool.Get().(*worker)
w.pool = p
w.run()
}
}
func (w *worker) run() {
go func() {
for {
var t *task
w.pool.taskLock.Lock()
if w.pool.taskHead != nil {
// 取出任务
t = w.pool.taskHead
w.pool.taskHead = w.pool.taskHead.next
atomic.AddInt32(&w.pool.taskCount, -1)
}
// 没有任务则结束
if t == nil {
// if there's no task to do, exit
w.close()
w.pool.taskLock.Unlock()
w.Recycle()
return
}
w.pool.taskLock.Unlock()
func() {
defer func() {
if r := recover(); r != nil {
if w.pool.panicHandler != nil {
w.pool.panicHandler(t.ctx, r)
} else {
msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
logger.CtxErrorf(t.ctx, msg)
}
}
}()
// 执行
t.f()
}()
t.Recycle()
}
}()
}
func (t *task) Recycle() {
t.zero()
taskPool.Put(t)
}