# 高并发下的通信方式:Channel管道
- Channel 理念
- Channel 结构
- Channel 底层原理
# 为什么要用Channel, 共享内存不好用吗?
管道的声明方法
- make(chan int) // 无缓冲
- make(chan bool, 0) // 无缓冲
- make(chan string, 2) // 有缓冲
Channel 基本用法
- ch <- x // 发送数据x
- x = <- ch // 接收数据,赋给x
- <- ch // 接收数据,并丢弃
// 无缓冲区不能直接往里送
func main() {
ch := make(chan int)
ch <- 0
<-ch
}
// fatal error: all goroutines are asleep - deadlock!
// 可以先启动协程接收
func main() {
ch := make(chan int)
go func() {
fmt.Println(<-ch)
}()
ch <- 2
time.Sleep(time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
内存与通信
- 不要通过共享内存的方式进行通信
- 而是应该通过通信的方式共享内存
共享内存的方案:
func Watch(i *int) {
for true {
if *i == 1 {
fmt.Println("hello")
break
}
}
}
func main() {
i := 0
go Watch(&i)
time.Sleep(time.Second)
i = 1
time.Sleep(time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
管道的方式:
func Watch(ch chan int) {
for true {
if <-ch == 1 {
fmt.Println("hello")
break
}
}
}
func main() {
ch := make(chan int)
go Watch(ch)
time.Sleep(time.Second)
ch <- 1
time.Sleep(time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
为什么要使用通信来共享内存?
- 避免协程竞争和数据冲突的问题
- 更高级的抽象,降低开发难度,增加程序的可读性
- 模块之间更容易解耦,增强拓展性和可维护性
# 如何设计高性能的 Channel
如何设计 Channel

源码结构体:
type hchan struct {
// 环形缓冲区,环形缓存可以大幅降低GC开销
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
elemtype *_type // element type
// 发送链表 与 发送到的位置
sendx uint // send index
sendq waitq // list of send waiters
// 接收链表 与 接收到的位置
recvx uint // receive index
recvq waitq // list of recv waiters
// 锁
lock mutex
// 状态
closed uint32
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

两个队列:

互斥锁
- 互斥锁并不是排队发送/接收数据
- 互斥锁保护的chan结构体本身
- Channel并不是无锁的
状态值
- 0 开启
- 1 关闭
总结
- “Channel 是 go 的一等公民”

# Channel 发送数据的底层原理是什么?
c <- 关键字
- c <- 关键字 是一个语法糖
- 编译阶段,会把 c <- 转化为 runtime.chansend1()
- chansend1()会调用chansend()方法
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
1
2
3
4
5
2
3
4
5
管道发送:直接发送
- 发送数据前,已经有G在休眠等待接收
- 此时缓存肯定是空的,不用考虑缓存
- 将数据直接拷贝给G的接收变量,唤醒G

代码实现:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock) // 上锁
if sg := c.recvq.dequeue(); sg != nil { // 接收队列不等于空,取出 sudog
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep) // 传送值
sg.elem = nil
}
goready(gp, skip+1) // 唤醒g
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
管道发送:放入缓存
- 没有G在休眠等待,但是有缓存空间
- 将数据放入缓存
实现:
- 获取可存入的缓存地址
- 存入数据
- 维护索引
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock) // 上锁
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 获取可存入的缓存地址
typedmemmove(c.elemtype, qp, ep) // 存入数据
c.sendx++ // 维护索引
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++ // 维护索引
unlock(&c.lock) // 解锁
return true
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
管道发送:休眠等待
- 把自己包装成 sudog
- sudog 放入 sendq 队列
- 休眠并解锁
- 被唤醒后,数据已经被取走,维护其他数据
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 获取当前协程,并包装成sudog
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
// 放入 sendq 队列
c.sendq.enqueue(mysg)
// 休眠
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
}
// 解锁
func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
unlock((*mutex)(chanLock))
return true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type sudog struct {
elem unsafe.Pointer // 指向管道赋值的变量
}
1
2
3
2
3
总结
- 编译阶段,会把 <- 转化未 runtime.chansend1()
- 直接发送时,将数据直接拷贝到目标变量
- 放入缓存,将数据放入环形缓存,成功返回
- 休眠等待时,将自己包装后放入sendq,休眠
# Channel 接收数据的底层原理是什么?
<- 关键字
- <- 关键字 是一个语法糖
- 编译阶段, i <- c 转化为 runtime.chanrecv1()
- 编译阶段, i, ok <- 转化为 runtime.chanrecv2()
- 最后会调用 chanrecv() 方法
Channel 接收情况
- 有等待的G,从G接收
- 有等待的G,从缓存接收
- 接收缓存
- 阻塞接收
有等待的G, 从G接收
原理:
- 接收数据前,已经有G在休眠等待发送
- 而且这个Channel没有缓存
- 将数据直接从G拷贝过来,唤醒G
实现:
- 判断是否有 G 在发送队列等待,进入recv()
- 判断此 Channel 无缓存
- 直接从等待的G中取走数据,唤醒G
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock) // 上锁
if c.closed != 0 { // 关闭则退出
return true, false
} else {
if sg := c.sendq.dequeue(); sg != nil { // 从发送队列取一个协程
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 接收数据
return true, true
}
}
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { // 判断缓存区是否是空的
recvDirect(c.elemtype, sg, ep) // 直接接收
}
// 修改状态
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1) // 唤醒
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
有等待的G,从缓存接收
- 接收数据前,已经有G在休眠等待发送
- 而且这个Channel有缓存
- 从缓存取走一个数据
- 将休眠G的数据放进护缓存,唤醒G
实现:
- 判断有G在发送队列等待,进入recv()
- 判断此Channel有缓存
- 从缓存中取走一个数据
- 将G的数据放入缓存,唤醒G
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
qp := chanbuf(c, c.recvx) // 冲channel里取出一个单元数据
typedmemmove(c.elemtype, ep, qp) // 将队列数据考给接收者
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem) // 将数据从发送者拷贝到队列
c.recvx++ // 维护索引
goready(gp, skip+1) // 唤醒G
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
接收缓存
- 没有G在休眠等待发送,但是缓存有内容
- 从缓存取走数据
实现:
- 判断没有G在发送队列等待
- 判断此Channel有缓存
- 从缓存中取走一个数据
阻塞接收
- 没有G在休眠等待,而且没有缓存或缓存空
- 自己进入接收队列,休眠等待
实现:
- 判断没有G在发送队列等待
- 判断此Channel无缓存
- 将自己包装成sudog
- sudog放入接收等待队列,休眠
- 唤醒时,发送的G已经把数据拷贝到位
总结
- 编译阶段, <- c 会转化为 chanrecv()
- 有等待的G,且无缓存时,从G接收
- 有等待的G,且有缓存时,从缓存接收
- 无等待的G,且缓存有数据,从缓存接收
- 无等待的G,且缓存无数据,等待喂数据
# 实战:非阻塞的Channel怎么做?
func main() {
ch := make(chan int)
ch1 := make(chan int)
select {
case <- ch:
fmt.Println("ch recv")
case <- ch1:
fmt.Println("ch1 recv")
default:
fmt.Println("nothing !!")
}
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
select 原理
- 同时存在接收,发送,默认路径
- 首先查看是否有可以立即执行的case
- 没有的话,有default,走default
- 没有default,把自己注册在所有的channel中,休眠等待
timer
- timer 可以提供一个 channel,定时塞入数据
# 小结
为什么使用Channel
相对于无锁
- 避免协程竞争和数据冲突的问题
相对于加锁
- 更高级的抽象,降低开发难度,增加程序可读性
- 模块之间更容易解耦,增加扩展和可维护性
Channel基本结构
- 一个环形缓存
- 两个链表(发送协程/接收协程)
- 一个互斥锁(保护hchan)
- 一个状态值
Channel 数据发送原理
- 直接发送时,将数据直接拷贝到目标变量
- 放入缓存时,将数据放入环形缓存,成功返回
- 休眠等待时,将自己包装后放入sendq,休眠
Channel 数据接收原理
- 有等待的G,且无缓存时,从G接收
- 有等待的G,且有缓存时,从缓存接收
- 无等待的G,且缓存有数据,从缓存接收
- 无等待的G,且缓存无数据,等待喂数据
非阻塞 Channel
- 使用select可以使用Channel的非阻塞特性
- 使用 timer 配合Slect 可实现超时特性
← 高并发下的保安-锁机制 TCP网络编程 →