DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

Build New Goroutine

原文链接

现在主流的线程模型分三种:内核级线程模型、用户级线程模型和两级线程模型(也称混合型线程模型),传统的协程库属于用户级线程模型,而goroutine和它的Go Scheduler在底层实现上其实是属于两级线程模型

内核调度实体(KSE,Kernel Scheduling Entity)

用户线程与内核线程KSE是多对一(N : 1)的映射模型,多个用户线程的一般从属于单个进程并且多线程的调度是由用户自己的线程库来完成,线程的创建、销毁以及多线程之间的协调等操作都是由用户自己的线程库来负责而无须借助系统调用来实现。一个进程中所有创建的线程都只和同一个KSE在运行时动态绑定。但是,该模型有个原罪:并不能做到真正意义上的并发,假设在某个用户进程上的某个用户线程因为一个阻塞调用(比如I/O阻塞)而被CPU给中断(抢占式调度)了,那么该进程内的所有线程都被阻塞(因为单个用户进程内的线程自调度是没有CPU时钟中断的,从而没有轮转调度),整个进程被挂起。所以,很多的协程库会把自己一些阻塞的操作重新封装为完全的非阻塞形式,然后在以前要阻塞的点上,主动让出自己,并通过某种方式通知或唤醒其他待执行的用户线程在该KSE上运行,从而避免了内核调度器由于KSE阻塞而做上下文切换,这样整个进程也不会被阻塞了

两级线程模型是博采众长之后的产物,充分吸收前两种线程模型的优点且尽量规避它们的缺点。在此模型下,用户线程与内核KSE是多对多(N : M)的映射模型:首先,区别于用户级线程模型,两级线程模型中的一个进程可以与多个内核线程KSE关联,也就是说一个进程内的多个线程可以分别绑定一个自己的KSE,这点和内核级线程模型相似;其次,又区别于内核级线程模型,它的进程里的线程并不与KSE唯一绑定,而是可以多个用户线程映射到同一个KSE,当某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行。

###G-P-M

  • G: 表示Goroutine,每个Goroutine对应一个G结构体,G存储Goroutine的运行堆栈、状态以及任务函数,可重用。G并非执行体,每个G需要绑定到P才能被调度执行。
  • P: Processor,表示逻辑处理器, 对G来说,P相当于CPU核,G只有绑定到P(在P的local runq中)才能被调度。对M来说,P提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等,P的数量决定了系统内最大可并行的G的数量(前提:物理CPU核数 >= P的数量),P的数量由用户设置的GOMAXPROCS决定,但是不论GOMAXPROCS设置为多大,P的数量最大为256。
  • M: Machine,OS线程抽象,代表着真正执行计算的资源,在绑定有效的P后,进入schedule循环;而schedule循环的机制大致是从Global队列、P的Local队列以及wait队列中获取G,切换到G的执行栈上并执行G的函数,调用goexit做清理工作并回到M,如此反复。M并不保留G状态,这是G可以跨M调度的基础,M的数量是不定的,由Go Runtime调整,为了防止创建过多OS线程导致系统调度不过来,目前默认最大限制为10000个。

work-stealing的调度算法:

  • 每个P维护一个G的本地队列;
  • 当一个G被创建出来,或者变为可执行状态时,就把他放到P的可执行队列中;
  • 当一个G在M里执行结束后,P会从队列中把该G取出;如果此时P的队列为空,即没有其他G可以执行, M就随机选择另外一个P,从其可执行的G队列中取走一半。

G-P-M模型的定义放在src/runtime/runtime2.go里面,而调度过程则放在了src/runtime/proc.go里。

起因:

1
2
3
4
5
6
7
8
9
10
11
12
func (srv *Server) Serve(l net.Listener) error {
defer l.Close()
...
// 不断循环取出TCP连接
for {
// 看我看我!!!
rw, e := l.Accept()
...
// 再看我再看我!!!
go c.serve(ctx)
}
}

请求大量,goroutine,起大量

goroutine池化是有其现实意义的

目的:实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下可以极大地提高并发性能。

启动服务之时先初始化一个 Goroutine Pool 池,这个Pool维护了一个类似栈的LIFO队列 ,里面存放负责处理任务的Worker,然后在client端提交task到Pool中之后,在Pool内部,接收task之后的核心操作是:

  1. 检查当前Worker队列中是否有可用的Worker,如果有,取出执行当前的task;
  2. 没有可用的Worker,判断当前在运行的Worker是否已超过该Pool的容量:{是 —> 再判断工作池是否为非阻塞模式:[是 ——> 直接返回 nil,否 ——> 阻塞等待直至有Worker被放回Pool],否 —> 新开一个Worker(goroutine)处理};
  3. 每个Worker执行完任务之后,放回Pool的队列中等待。

该任务在初始化一个Pool之时启动,每隔一定的时间间隔去检查空闲Worker队列中是否有已经过期的Worker,有则清理掉,通过定时清理过期worker,进一步节省系统资源。

pool结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type sig struct{}

type f func() error

// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
// capacity of the pool.
capacity int32

// running is the number of the currently running goroutines.
running int32

// expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration

// workers is a slice that store the available workers.
workers []*Worker

// release is used to notice the pool to closed itself.
release chan sig

// lock for synchronous operation.
lock sync.Mutex

once sync.Once
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// get worker
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
var w *Worker
// 标志变量,判断当前正在运行的worker数量是否已到达Pool的容量上限
waiting := false
// 加锁,检测队列中是否有可用worker,并进行相应操作
p.lock.Lock()
idleWorkers := p.workers
n := len(idleWorkers) - 1
// 当前队列中无可用worker
if n < 0 {
// 判断运行worker数目已达到该Pool的容量上限,置等待标志
waiting = p.Running() >= p.Cap()

// 当前队列有可用worker,从队列尾部取出一个使用
} else {
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]
}
// 检测完成,解锁
p.lock.Unlock()
// Pool容量已满,新请求等待
if waiting {
// 利用锁阻塞等待直到有空闲worker
for {
p.lock.Lock()
idleWorkers = p.workers
l := len(idleWorkers) - 1
if l < 0 {
p.lock.Unlock()
continue
}
w = idleWorkers[l]
idleWorkers[l] = nil
p.workers = idleWorkers[:l]
p.lock.Unlock()
break
}
// 当前无空闲worker但是Pool还没有满,
// 则可以直接新开一个worker执行任务
} else if w == nil {
w = &Worker{
pool: p,
task: make(chan f, 1),
}
w.run()
// 运行worker数加一
p.incRunning()
}
return w
}

定期清理goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// clear expired workers periodically.
func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
for range heartbeat.C {
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 {
p.lock.Unlock()
return
}
n := 0
for i, w := range idleWorkers {
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.task <- nil
idleWorkers[i] = nil
}
n++
if n >= len(idleWorkers) {
p.workers = idleWorkers[:0]
} else {
p.workers = idleWorkers[n:]
}
p.lock.Unlock()
}
}

Goroutine Pool真正的价值还是在:

  1. 限制并发的goroutine数量;
  2. 复用goroutine,减轻runtime调度压力,提升程序性能;
  3. 规避过多的goroutine侵占系统资源(CPU&内存)。