WaitGroup
sync.WaitGroup
可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers only guarantee that 64-bit fields are 32-bit aligned.
// For this reason on 32 bit architectures we need to check in state()
// if state1 is aligned or not, and dynamically "swap" the field order if
// needed.
state1 uint64
state2 uint32
}
noCopy
:禁止拷贝。state
:state 是 WaitGroup 的状态数据字段,且是一个无符号64 bit的数据,内容包含了 counter , waiter 信息- counter 代表目前尚未完成的个数,WaitGroup.Add(n) 将会导致 counter += n, 而 WaitGroup.Done() 将导致 counter–。
- waiter 代表目前已调用 WaitGroup.Wait 的 goroutine 的个数。
Add
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// counter 增加数量
state := wg.state.Add(uint64(delta) << 32)
// 取出 counter
v := int32(state >> 32)
// 取出 waiter
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(&wg.sema))
}
// counter < 0,panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// delta > 0 && v == int32(delta) : 表示从 0 开始添加计数值
// w!=0 :表示已经有了等待者
// 说明说明在调用了 Wait() 方法之后又想加入新的等待者,这种操作是不允许的
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// 避免并发调用 Add() 和 Wait()
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 唤醒所有 waiter
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Wait
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
// 读取state
// 高32位 ==> counter
// 低32位 ==> waiter
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// counter减到0了,return
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
// counter 不为0,waiters++
if wg.state.CompareAndSwap(state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 阻塞
runtime_Semacquire(semap)
// 被唤醒,检查state是否等于0
// 不为0,说明存在计数值未恢复为0就重用,panic
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
Done
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Done
只是调用了 Add()
,将groutine - 1。
小结
sync.WaitGroup
必须在sync.WaitGroup.Wait
方法返回之后才能被重新使用。sync.WaitGroup.Done
只是对sync.WaitGroup.Add
方法的简单封装,我们可以向sync.WaitGroup.Add
方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine。- 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒。