# TCP网络编程

# 什么是非阻塞IO?

网络分层

注意:RESP 是 redis的协议

实在在应用层

tcp 流程

linux下:三次握手完,生成了socket 文件描述符

Socket

  • 很多系统都提供Socket作为TCP网络连接的抽象
  • Linux为例 -> Internet domain socket -> SOCK_STREAM
  • Linux 中 Socket 以 “文件描述符” FD 作为标识

Socket 通信过程

调用 close() ,进行tcp四次挥手

socket 将tcp的 协议使用 简化成了方法的调用。

# IO模型

  • IO模型指的是同时操作Socket的方案
  • 阻塞
  • 非阻塞
  • 多路复用方案

阻塞IO

  • 同步读写Socket时,线程陷入内核态
  • 当读写成功后,切换回用户态,继续执行
  • 优点:开发难度小,代码简单
  • 缺点:内核态切换开销大

非阻塞IO

  • 如果暂时无法收发数据,会返回错误
  • 应用会不断轮询,直到Socket可以读写
  • 优点:不会陷入内核态,自由度高
  • 缺点:需要自旋轮询

多路复用 --- Linux epll

  • 注册多个Socket事件
  • 调用epool,当有事件发生,返回
  • 优点:提供了事件列表,不需要查询各个Scoket
  • 缺点:开发难度大,逻辑复杂
  • Mac:kqueue; Windows: IOCP

总结

  • 操作系统提供了Socket作为TCP通信的抽象
  • IO模型指的时操作Socket的方案
  • 阻塞模型最利于业务编写,但是性能差
  • 多路复用性能好,但是业务编写麻烦

思考

  • 有没有能结合阻塞模型的多路复用的方法?

# Go是如何抽象Epoll的?

event poll[统计;竞选] ----> 统计 有哪些事件 在发生

阻塞模型 + 多路复用

  • 在底层使用操作系统的多路复用 IO
  • 在协程层次使用阻塞模型
  • 阻塞协程时,协程休眠

epoll抽象层

  • epoll 抽象层是为了统一整个操作系统对多路复用器的实现
  • Linux: Epoll
  • Windows:IOCP
  • Mac: Kqueue

多路复用器

各个系统的多路复用器都有以下功能:

  • 新建多路复用器 epoll_create()
  • 往多路复用器里插入需要监听的事件 epoll_ ctl()
  • 查询发生了什么事件 epoll_wait()

Go Network Poller 多路复用器的抽象

  • Go Network Poller 对于多路复用器的抽象和适配
  • epoll_create() -> netpolllinit()
  • epoll_ctl() -> netpollopen()
  • epoll_wait() -> netpoll()

方法实现:

netpolllinit()

  • 新建Epoll
  • 新建一个pipe管道用于中断Epoll
  • 将"管道有数据到达"事件注册在Epoll中
func netpollinit() {
	epfd = epollcreate1(_EPOLL_CLOEXEC) // 新建epoll
	if epfd < 0 { // 异常情况下新建epoll
		epfd = epollcreate(1024)
		if epfd < 0 {
			println("runtime: epollcreate failed with", -epfd)
			throw("runtime: netpollinit failed")
		}
		closeonexec(epfd)
	}
	r, w, errno := nonblockingPipe() // 创建unix下的管道,用于关闭epoll
	if errno != 0 {
		println("runtime: pipe failed with", -errno)
		throw("runtime: pipe failed")
	}
	ev := epollevent{
		events: _EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
	errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) // 监控管道事件
	if errno != 0 {
		println("runtime: epollctl failed with", -errno)
		throw("runtime: epollctl failed")
	}
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

netpollopen() 插入事件

  • 传入一个Socket的FD,和pollDesc指针
  • pollDesc指针是Socket相关详细信息
  • pollDesc中记录了哪个协程休眠在等待此Socket
  • 将Socket的可读,可写,断开事件注册到Epoll中
func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET // 监听四个事件
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
// pollDesc 网络层对 epoll的描述,就是 网路层一个netpoll的描述
type pollDesc struct {
    ...
	rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
	wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13

netpoll() 查询发生了什么事件

  • 调用 epoll_wait(), 查询有哪些事件发生
  • 根据Socket相关的pollDesc信息,返回哪些协程可以唤 醒
func netpoll(delay int64) gList {
    ...
    	var events [128]epollevent // 事件数组,交给epoll wait填写发生了哪些事件
retry:
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {  // 没有事件发生
		goto retry  // 循环
	}
    var toRun gList // 创建事件列表
    for i := int32(0); i < n; i++ { // 遍历返回事件
        if ev.events == 0 { // 没有事件发生
			continue
		}
        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            // 断开事件处理
        }
        var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            // 处理读事件
			mode += 'r'
		}
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            // 处理写事件
			mode += 'w'
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.setEventErr(ev.events == _EPOLLERR)
			netpollready(&toRun, pd, mode) //将事件放入Glist
		}
    }
 	...
}

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

Go/src/cmd/vendor/golang.org/x/sys/unix/zsyscall_linux_amd64.go

// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error) {
    	r0, _, e1 := Syscall6(SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) // 系统调用
    	n = int(r0)  // 返回事件的个数
}
1
2
3
4
5

源码展示

Go/src/cmd/vendor/golang.org/x/sys/unix/zsysnum_linux_amd64.go

SYS_EPOLL_CREATE            = 213
1

Go/src/runtime/sys_linux_amd64.s

// int32 runtime·epollcreate(int32 size);
TEXT runtime·epollcreate(SB),NOSPLIT,$0
	MOVL    size+0(FP), DI
	MOVL    $SYS_epoll_create, AX
	SYSCALL
	MOVL	AX, ret+8(FP)
	RET
// runtime 下 epollcreate
1
2
3
4
5
6
7
8

Go/src/runtime/netpoll_epoll.go

func epollcreate(size int32) int32

func netpollinit() {
	...
		epfd = epollcreate(1024)
	}
1
2
3
4
5
6

Go/src/runtime/netpoll.go

平台无关的方法,在总的文件有注释,具体实现在 不同平台的 go 文件中,如:Go/src/runtime/netpoll_epoll.go

// Integrated network poller (platform-independent part).  多路复用器
// A particular implementation (epoll/kqueue/port/AIX/Windows)
// must define the following functions:
//
// func netpollinit()
//     Initialize the poller. Only called once.
//
// func netpollopen(fd uintptr, pd *pollDesc) int32
//     Arm edge-triggered notifications for fd. The pd argument is to pass
//     back to netpollready when fd is ready. Return an errno value.
//
// func netpollclose(fd uintptr) int32
//     Disable notifications for fd. Return an errno value.
//
// func netpoll(delta int64) gList
//     Poll the network. If delta < 0, block indefinitely. If delta == 0,
//     poll without blocking. If delta > 0, block for up to delta nanoseconds.
//     Return a list of goroutines built by calling netpollready.
//
// func netpollBreak()
//     Wake up the network poller, assumed to be blocked in netpoll.
//
// func netpollIsPollDescriptor(fd uintptr) bool
//     Reports whether fd is a file descriptor used by the poller.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

总结

Go将多路复用器的操作进行了抽象和适配

  • 将新建多路复用器抽象为了 netpollinit()
  • 将插入监听事件抽象为了 netpollopen()
  • 将查询事件抽象为了netpoll()
  • 但不是返回事件,而是返回等待事件的协程列表

# NetWorkPoller 是如何工作的?

Go/src/runtime/netpoll.go

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 { // 原子操作
		lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited == 0 {
			netpollinit()
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Network Poller 初始化

  • poll_runtime_pollServerInit()
  • 使用原子操作保证只初始化一次
  • 调用 netpollinit

pollcache 与 pollDesc

  • pollcache: 一个带锁的链表头
  • pollDesc:链表成员
  • pollDesc 是 runtime 包对 Socket的详细描述
  • rg, wg: 1, 或 2, 或者等待协程G地址
type pollCache struct {
	lock  mutex  // 互斥锁保证只有一个协程在操作
	first *pollDesc // 链表,缓存pollDesc
	// PollDesc objects must be type-stable, 
	// because we can get ready notification from epoll/kqueue
	// after the descriptor is closed/reused.
	// Stale notifications are detected using seq variable,
	// seq is incremented when deadlines are changed or descriptor is reused.
}
1
2
3
4
5
6
7
8
9

Network Poller 新增监听 Socket

  • poll_runtime_pollOpen()
  • 在 pollcache 链表中分配一个 pollDesc
  • 初始化 pollDesc(rg wg 为 0)
  • 调用 netpollopen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    // fd 是文件描述符,即传入的是 文件 socket的名字或者ID
    pd := pollcache.alloc() // 生成一个 pollDesc
    lock(&pd.lock)
	pd.fd = fd  // pd 与 pollDesc 建立关联
    unlock(&pd.lock)
	errno := netpollopen(fd, pd)
}
1
2
3
4
5
6
7
8

# NetWorkPoller 收发数据

收发数据分为两个场景

  • 协程需要收发数据时,Socket已经可读写
  • 协程需要收发数据时,Socket暂时无法读写

场景一:Socket已经可读写

  • runtime 循环调用netpoll()方法(g0协程)
  • 发现socket可读写时,给对应的rg或者wg置为pdReady(1)
  • 协程调用poll_runtime_pollWait()
  • 判断rg或者wg已经置为pdReady(1),返回0

Go/src/runtime/netpoll_epoll.go

// 系统循环调用查询事件
func netpoll(delay int64) gList {
    ...
    netpollready(&toRun, pd, mode)
    ...
}

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    ...
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if rg != nil {
		toRun.push(rg)
	}
    ...
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
		old := gpp.Load()
		if old == pdReady {
			return nil
		}
		if old == 0 && !ioready {
			// Only set pdReady for ioready. runtime_pollWait
			// will check for timeout/cancel before waiting.
			return nil
		}
		var new uintptr
		if ioready {
			new = pdReady // 等于1
		}
		if gpp.CompareAndSwap(old, new) { // 可读将gpp := &pd.rg 可读标志位改成1,如果进来就是可读,直接返回nil
			if old == pdWait {
				old = 0
			}
			return (*g)(unsafe.Pointer(old))
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

找到调用方:Go/src/runtime/mgc.go

// gc垃圾回收的时候会周期调用
// 原因就是垃圾回收器不断再跑,在垃圾回收器下钩子
func gcStart(trigger gcTrigger) {
	
}
1
2
3
4
5

协程调用poll_runtime_pollWait取读写数据

C:/Program Files/Go/src/runtime/netpoll.go

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	...
    for !netpollblock(pd, int32(mode), false) {} // 如果可读,往下走没返回无错误
	return pollNoError // 无错误,表示业务协程不会被阻塞
}
    
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    ...
	if gpp.CompareAndSwap(pdReady, 0) {
			return true // 如果是pdReady直接返回true
	}
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13

场景2:Socket暂时无法读写

  • runtime 循环调用netpoll()方法(g0协程)
  • 协程调用poll_runtime_pollWait()
  • 发现对应的rg或者wg为0
  • 给对用的rg或者wg置为协程地址
  • 休眠等待

继续:

  • runtime 循环调用 netpoll()方法(g0协程)
  • 发现Socket可读写时,给对应的查看对应的rg或者wg
  • 若为协程地址(不是0也不是1的话),返回协程地址
  • 调度器调度对应的协程

总结

go 中的 network poller 对各个系统的多路复用器做了抽象,使得pollDesc与socket一一对应,当有socket可读或者可写的时候,修改pollDesc的状态,当业务协程调用socket读写时,如果pollDesc的描述可读写,则直接读写socket,如果不可读写,会将自己的协程地址挂在pollDesc上,等待gc调用,唤醒协程读写。

  • Network Poller 是 Runtime 的强大工具
  • 抽象了多路复用器的操作
  • Network Poller 可以自动监测多个Socket状态
  • 在 Socket 状态可用时,快速返回成功
  • 在Socket 状态不可用时,休眠等待

思考

  • 我们知道了如何检测Socket状态
  • Socket从哪来?
  • 直到Socket可操作后,做什么?

# Go 是如何抽象Socket的?

net 包

  • net 包是 go 原生的网络包
  • net 包实现了 TCP、UDP、HTTP 等网络操作

回顾socket通信过程

net.Listen

net.Listen("tcp", ":8888") // 监听本地的8888端口
1
  • 新建Socket,并执行bind操作
  • 新建一个FD(net 包对Socket 的详情描述)
  • 返回一个TCPListenter对象
  • 将TCPListener 的FD信息加入监听
func Listen(network, address string) (Listener, error) {
	var lc ListenConfig // 网络连接的配置
	return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    // 检查地址配置
    addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
    ...
    l, err = sl.listenTCP(ctx, la) // 调用 listenTcp
    ...
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
	// 新建sokect
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
	if err != nil {
		return nil, err
	}
	return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
		raddr = raddr.toLocal(net)
	}
	family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
	return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

// Go/src/net/sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	s, err := sysSocket(family, sotype, proto) // 系统调用新建一个socket
    if fd, err = newFD(s, family, sotype, net); err != nil {} // 新建一个netFD,net包里对一个socket的详细描述
    ...
    if laddr != nil && raddr == nil {
        switch sotype {
            case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: // socket 是网络流
            // 监听网络流
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
}
        
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    ....
    // 绑定端口
	if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
		return os.NewSyscallError("bind", err)
	}
    // 监听端口
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
		return os.NewSyscallError("listen", err)
	}
    ...
    if err = fd.init(); err != nil { // 初始化netFD
		return err
	}
}

func (fd *netFD) init() error {
	return fd.pfd.Init(fd.net, true) // 初始化 pollDesc
}
        
func (fd *FD) Init(net string, pollable bool) error {
	...
	err := fd.pd.init(fd)
    ...
}
        
func (pd *pollDesc) init(fd *FD) error {
	...
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	...
}
        
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
	errno := netpollopen(fd, pd) // network poller 抽象层的方法,底层根据不同系统,使用不同的多路复用器
    ...
}

// net包里对于一个socket详细解释
type netFD struct {
	net      string
	laddr    Addr
	pfd      poll.FD 
}

type FD struct {
	pd pollDesc // 记录了pollDesc的信息
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

TCPListener.Accept()

  • 直接调用Socket的accept()
  • 如果失败,休眠等待新的连接
  • 将新的Socket包装为TCPConn变量返回
  • 将TCPConn 的 FD 信息加入监听
  • TCPConn 本质上是一个 ESTABLISHED 状态的 Socket
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
	...
	c, err := l.accept()
	...
}

func (ln *TCPListener) accept() (*TCPConn, error) {
	fd, err := ln.fd.accept() // 拿到fd
	if err != nil {
		return nil, err
	}
	tc := newTCPConn(fd) // 把fd 包装程TCPConn 并返回
	if ln.lc.KeepAlive >= 0 {
		setKeepAlive(fd, true)
		ka := ln.lc.KeepAlive
		if ln.lc.KeepAlive == 0 {
			ka = defaultTCPKeepAlive
		}
		setKeepAlivePeriod(fd, ka)
	}
	return tc, nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    d, rsa, errcall, err := fd.pfd.Accept()
    // d 是新的socket的i的,传给newFD,记录socket信息
	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
    // 初始化pollDesc,并调用 runtime_pollOpen 监控socket
    if err = netfd.init(); err != nil {
		netfd.Close()
		return nil, err
	}
}

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
   for {
		s, rsa, errcall, err := accept(fd.Sysfd) // 调用系统accept
		if err == nil {
			return s, rsa, "", err
		}
		switch err {
		case syscall.EINTR: // 出现该错误可以重试
			continue
		case syscall.EAGAIN: // 表示暂时没有数据
			if fd.pd.pollable() {
                // 最后调用 runtime_pollWait, 即网络层的 poll_runtime_pollWait
                // 该方法会查看 socket 是否可读,如果不可读,将该协程休眠记录协程地址,等待唤醒
				if err = fd.pd.waitRead(fd.isFile); err == nil { 
					continue
				}
			}
		case syscall.ECONNABORTED:// 出现该错误可以重试
			// This means that a socket on the listen
			// queue was closed before we Accept()ed it;
			// it's a silly error, so try again.
			continue
		}
		return -1, nil, errcall, err
	}
    ...
}

// Wrapper around the accept system call that marks the returned file
// descriptor as nonblocking and close-on-exec.
func accept(s int) (int, syscall.Sockaddr, string, error) {
    ns, sa, err := AcceptFunc(s)
}

// AcceptFunc is used to hook the accept call. 操作系统的accept调用
var AcceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept

type Conn interface {
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

TCPConn.Read() / Write()

  • 直接Socket 原生读写方法
  • 如果失败,休眠等待可读/可写
  • 被唤醒后调用系统Socket进行读写

一个简单的通信应用

func TestNet(t *testing.T){
	ls, err := net.Listen("tcp", ":8888")
	if err != nil {
		panic(any(err))
	}
	conn, err := ls.Accept()
	if err != nil {
		panic(any(err))
	}

	var body [100]byte
	for true {
		_, err = conn.Read(body[:])
		if err != nil {
			panic(any(err))
		}
		fmt.Printf("read message %s /n", string(body[:]))
		_, err = conn.Write(body[:])
		if err != nil {
			panic(any(err))
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

总结

  • net包抽象了TCP网络操作
  • 使用net.Listen()得到TCPListener(LISTEN 状态的 Socket)
  • 使用TCPListener.Accept()得到TCPConn(ESTABLISHED)
  • TCPConn.Read()/Write()进行读写Socket的操作
  • Network Poller 作为上述功能的底层支撑

当前架构图:

net 包获取socket后会直接操作socket,当socket不可用时,交给Network Poller 监听事件,然后休眠,等待被唤醒。

# 实战:怎样结合阻塞模型和多路复用?

使用 一个协程一个连接的方式进行编程。

思路

  • 用主协程监听Listner
  • 每个Conn使用一个新协程处理
func TestNet(t *testing.T){
	ls, err := net.Listen("tcp", ":8888")
	if err != nil {
		panic(any(err))
	}
	for true {
		conn, err := ls.Accept()
		if err != nil {
			panic(any(err))
		}
		go HandleConn(conn)
	}
}

func HandleConn(conn net.Conn) {
	defer conn.Close()
	var body [100]byte
	for true {
		_, err := conn.Read(body[:])
		if err != nil {
			panic(any(err))
		}
		fmt.Printf("read message %s /n", string(body[:]))
		_, err = conn.Write(body[:])
		if err != nil {
			panic(any(err))
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

总结

  • groutine-per-connection 编程风格
  • 结合了多路复用的性能那个和阻塞模型的简洁

# 本章总结

  • 操作系统提供了Socket 作为 TCP 通信的抽象
  • IO模型指的是操作系统操作Socket的方案
  • 阻塞模型最利于业务编写,但是性能差
  • 多路复用性能好,但是业务编写麻烦

多路复用抽象

GO将多路复用器进行了抽象和适配

  • 将新建多路复用器抽象为了 netpollinit()
  • 将插入事件抽象为了netpollopen()
  • 将查询事件抽象为了netpoll()
  • 但不是返回事件,而是返回等待事件的协程列表

NetWork Poller 的工作原理

  • Network Poller 是 Runtime 的强大工具
  • 抽象了多路复用器的操作
  • Network Poller 可以自动监测多个Socket状态
  • 在Socket状态可用时,快速返回成功
  • 在Socket状态不可用时,休眠等待

由于nework包是用于监听 socket,创建则需要由net包创建

Net 包

  • net 包抽象了 TCP 网络操作
  • 使用net.Listen()得到TCPListener(LISTEN 状态的 Socket)
  • 使用 TCPListener.Accept()得到TCPConn(ESTABLISHED)
  • TCPConn.Read()/Write()进行读写Socket的操作
  • Network Poller 作为上述功能的底层支撑

goroutine-per-connertion

  • 用主协程监听Listener
  • 每个Conn使用一个新协程处理
  • 结合了多路复用的性能和阻塞模型的简洁

#

上次更新: 3/12/2023,