前言
go channel 是 go 并发最核心的组件, 也体现 go 的并发思想:
Do not communicate by sharing memory; instead, share memory by communicating.
chanel 分为两类:
无缓冲channel,可以看作“同步模式”,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输(后面会看到,实际上就是内存拷贝)。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
有缓冲channel称为“异步模式”,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作 (如接收)才会被唤醒。
基本操作:
- 读取
<- chan - 写入
chan <- - 关闭
close(chan) - 获取channel长度
len(chan) - 获取channel容量
cap(chan)
在介绍 channel 的底层原理之前, 先回忆一下进程/线程之间交换数据的方式, 无非就是:
- 共享内存或者加互斥锁
- 先进先出(FIFO)将资源分配给等待时间最长的线程。
go 采用的是第二种方案, go 内存模型对 happens-before 原则实现:
- 修改由多个goroutines同时访问的数据的程序必须串行化这些访问
- 为了实现串行访问,需要使用 channel 操作或其他同步原语(如sync和sync/atomic包中的原语)来保护数据
- go语句创建一个 goroutine,一定发生在 goroutine 执行之前
- 往一个 channel 中发送数据,一定发生在从这个 channel 读取这个数据完成之前
- 一个channel的关闭,一定发生在从这个 channel 读取到零值数据(这里指因为close而返回的零值数据)之前
- 从一个无缓冲 channel 的读取数据,一定发生在往这个 channel 发送数据完成之前
如果违反了上述规则, go 就会抛出 panic
channel 底层原理
数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13
| type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex }
|

sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 waitq 表示,链表中所有的元素都是 sudog 结构:
1 2 3 4
| type waitq struct { first *sudog last *sudog }
|
sudog 代表着等待队列中的一个 goroutine(或者理解为是一个挂起的 goroutine)。
一个 G 可以出现在许多等待队列上,因此一个 G 可能有多个sudog。并且多个 G 可能正在等待同一个同步对象,因此一个对象可能有许多 sudog。sudog 是从特殊池中分配出来的。使用 acquireSudog 和 releaseSudog 分配和释放它们。
创建 chan
chan 使用 make 创建: make(chan int, 10), 最终编译器会转为 : runtime.makechan()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func makechan(t *chantype, size int) *hchan { elem := t.elem
if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
var c *hchan switch { case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan)
if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
|
发送数据
针对发送数据(-> chan), 主要是调用的 chansend() 函数
1 2 3
| func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) }
|
chansend()函数的主要逻辑是:
1)在chan为 nil 未初始化的情况下,对于 select 这种非阻塞的发送,直接返回 false,对于阻塞的发送,将 goroutine 挂起,并且永远不会返回:
1 2 3 4 5 6 7 8 9 10 11 12 13
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } ...... }
|
2)非阻塞发送的情况下,当 channel 不为 nil,并且 channel 没有关闭时,如果没有缓冲区且没有接收者receiver,或者缓冲区已经满了,返回 false:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| if !block && c.closed == 0 && full(c) { return false }
....... func full(c *hchan) bool { if c.dataqsiz == 0 { return c.recvq.first == nil } return c.qcount == c.dataqsiz }
|
full() 作用是判断在 channel 上发送是否会阻塞,用来判断的参数是qcount,c.recvq.first,dataqsiz前两个变量都是单字长的,所以对它们单个值的读操作是原子性的。
dataqsiz字段,它在创建完 channel 以后是不可变的,因此它可以安全的在任意时刻读取。
3)接下来,对chan加锁,判断chan不是关闭状态,再从recvq队列中取出一个接收者,如果接收者存在,则直接向它发送消息,绕过循环队列buf,此时,由于有接收者存在,则循环队列buf一定是空的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ...... lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ......
|
send() 函数主要完成了 2 件事:调用 sendDirect() 函数将数据拷贝到了接收者的内存地址上;调用 goready() 将等待接收的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ...... if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
|
4)继续往下执行,接下来是有缓冲区的异步发送的逻辑:
如果缓冲区buf还没有满,调用 chanbuf() 获取 sendx 索引的元素指针值。调用 typedmemmove() 方法将发送的值拷贝到缓冲区 buf 中。拷贝完成,增加 sendx 索引下标值和 qcount 个数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) ...... typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
|
5)如果执行前面的步骤还没有成功发送,就表示缓冲区没有空间了,而且也没有任何接收者在等待。后面需要将 goroutine 挂起然后等待新的接收者了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| if !block { unlock(&c.lock) return false }
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep)
|
6)chansend()方法最后的代码是当goroutine唤醒以后,解除阻塞的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ...... if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
|
总结下来整体逻辑是:
- 首先select这种非阻塞的发送,判断两种情况;
- 然后是正常的阻塞调用,先判断recvq等待接收队列是否为空,不为空说明缓冲区中没有内容或者是一个无缓冲channel;
- 如果recvq有接收者,则缓冲区一定为空,直接从recvq中取出一个goroutine,然后写入数据,接着唤醒 goroutine,结束发送过程;
- 如果缓冲区有空余的位置,写入数据到缓冲区,完成发送;
- 如果缓冲区满了,那么就把发送数据的goroutine放到sendq中,进入睡眠,等待该goroutine被唤醒。
接收数据
使用 chan 接收数据有如下操作:
跳过编译流程, 本质上是走到 chanrecv():
1 2 3
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ...... }
|
chanrecv()方法有两个返回值,selected, received bool,前者表示是否接收到值,后者表示接收的值是否关闭后发送的。
有三种情况:
- 如果是非阻塞的情况,没有数据可以接收,则返回 (false,flase);
- 如果 chan 已经关闭了,将 ep 指向的值置为 0值,并且返回 (true, false);
- 其它情况返回值为 (true,true),表示成功从 chan 中获取到了数据,且是chan未关闭发送
进入具体的逻辑
- 首先判断如果chan为空,且是select这种非阻塞调用,那么直接返回 (false,false),否则阻塞当前的goroutine;
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ...... if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } ...... }
|
2)如果是非阻塞调用,通过empty()方法原子判断是无缓冲chan或者是chan中没有数据且chan没有关闭,则返回(false,false),
如果chan关闭,为了防止检查期间的状态变化,二次调用empty()进行原子检查,如果是无缓冲chan或者是chan中没有数据,返回 (true, false),这里的第一个true表示chan关闭后读取的 0 值;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| //非阻塞调用,通过empty()判断是无缓冲chan或者是chan中没有数据 if !block && empty(c) { // 如果chan没有关闭,则直接返回 (false, false) if atomic.Load(&c.closed) == 0 { return } // 如果chan关闭, 为了防止检查期间的状态变化,二次调用empty()进行原子检查,如果是无缓冲chan或者是chan中没有数据,返回 (true, false) if empty(c) { if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } func empty(c *hchan) bool { // c.dataqsiz 是不可变的 if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0 }
|
接下来阻塞调用的逻辑,chanrecv方法对chan加锁,判断chan如果已经关闭,并且chan中没有数据,返回 (true,false),这里的第一个true表示chan关闭后读取的 0 值;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ...... lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } ......
|
接下来,从发送队列中获取一个等待发送的 goroutine,即取出等待队列队头的 goroutine。如果缓冲区的大小为 0,则直接从发送方接收值。否则,对应缓冲区满的情况,从队列的头部接收数据,发送者的值添加到队列的末尾(此时队列已满,因此两者都映射到缓冲区中的同一个下标)。这里需要注意,由于有发送者在等待,所以如果有缓冲区,那么缓冲区一定是满的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| ...... if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
|
- 接下来,是异步接收逻辑,如果缓冲区有数据,直接从缓冲区接收数据,即将缓冲区recvx指向的数据复制到ep接收地址,并且将recvx加1;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
|
- 然后是缓冲区没有数据的情况;如果是select这种非阻塞读取的情况,直接返回(false, false),表示获取不到数据;否则,会获取sudog绑定当前接收者goroutine,调用gopark()挂起当前接收者goroutine,等待chan的其他发送者唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| if !block { unlock(&c.lock) return false, false }
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 }
mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
|
- 最后,当前goroutine被唤醒,完成chan数据的接收,之后进行参数检查,解除chan绑定,并释放sudog。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ...... if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }
|
总结:
- 也是先判断select这种非阻塞接收的两种情况(block为false);然后是加锁进行阻塞调用的逻辑;
- 同步接收:如果发送者队列sendq不为空,且没有缓存区,直接从sendq中取出一个goroutine,读取当前goroutine中的消息,唤醒goroutine, 结束读取的过程;
- 同步接收:如果发送者队列sendq不为空,说明缓冲区已经满了,移动recvx指针的位置,取出一个数据,同时在sendq中取出一个goroutine,拷贝里面的数据到buf中,结束当前读取;
- 异步接收:如果发送者队列sendq为空,且缓冲区有数据,直接在缓冲区取出数据,完成本次读取;
- 阻塞接收:如果发送者队列sendq为空,且缓冲区没有数据。将当前goroutine加入recvq,进入睡眠,等待被发送者goroutine唤醒。
关闭 chan
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist gList
for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
|
关闭chan的步骤是:
先检查异常情况,当 Channel 是一个 nil 空指针或者关闭一个已经关闭的 channel 时,Go 语言运行时都会直接 panic;
关闭的主要工作是释放所有的接收者和发送者:将所有的接收者 readers 的 sudog 等待队列(recvq)加入到待清除队列 glist 中。注意这里是先回收接收者,因为从一个关闭的 channel 中读数据,不会发生 panic,顶多读到一个默认零值。再回收发送者 senders,将发送者的等待队列 sendq 中的 sudog 放入待清除队列 glist 中。注意这里可能会产生 panic,因为往一个关闭的 channel 中发送数据,会产生 panic。