欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

去分析 TCP 连接的源代码 (II) - 接受

最编程 2024-07-09 14:31:21
...

go源码解析之TCP连接系列基于go源码1.16.5

连接是如何建立的

上一章我们通过跟踪net.Listen的调用,了解了socket的创建、端口绑定、开启监听。最后net.Listen返回了一个Listener(具体对于TCP连接为TCPListener),本章将通过该Listener的Accept方法的跟踪来了解连接的建立过程。

ln, err := net.Listen("tcp", ":8080")
if err != nil {
        // handle error
}
for {
        conn, err := ln.Accept()
        if err != nil {
                // handle error
        }
        go handleConnection(conn)
}

下面我们通过逐行跟踪源码,来看连接建立的过程:

1.TCPListener的Accept方法

src/net/tcpsock.go

func (l *TCPListener) Accept() (Conn, error) {
    ...
    c, err := l.accept()
    ...
    return c, nil
}

Accept调用了TCPListener的内部方法accept:

src/net/tcpsock_posix.go

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

我们先跳过ln.fd.accept和newTCPConn两个方法调用,将上一章遗留的KeepAlive配置项看一下:
大家应该还记得KeepAlive是ListenConfig中的一个属性,而ListenConfig和创建成功的监听netFD被赋值给了TCPListener:

src/net/tcpsock.go

type TCPListener struct {
    fd *netFD
    lc ListenConfig
}

如果KeepAlive大于等于0,设置socket开启KeepAlive,如果KeepAlive等于0,默认设置socket的TCP_KEEPINTVL和TCP_KEEPIDLE属性为15秒,否则设置为用户指定的时间。

2.setKeepAlive

setKeepAlive和setKeepAlivePeriod方法类似,都是设置socket的属性,我们放到一起来看:

func setKeepAlive(fd *netFD, keepalive bool) error {
    err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(keepalive))
    runtime.KeepAlive(fd)
    return wrapSyscallError("setsockopt", err)
}

func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
    // The kernel expects seconds so round to next highest second.
    secs := int(roundDurationUp(d, time.Second))
    if err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, secs); err != nil {
        return wrapSyscallError("setsockopt", err)
    }
    err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, secs)
    runtime.KeepAlive(fd)
    return wrapSyscallError("setsockopt", err)
}

两个方法中都执行了fd.pfd.SetsockoptInt,而pfd则是netFD中的一个属性:

src/net/fd_posix.go

type netFD struct {
    pfd poll.FD

    ...
}

进一步看poll.FD的SetsockoptInt方法:

src/internal/poll/sockopt.go

func (fd *FD) SetsockoptInt(level, name, arg int) error {
    ...
    return syscall.SetsockoptInt(fd.Sysfd, level, name, arg)
}

可以看到进行了SetsockoptInt的系统调用,进行socket的属性设置。被设置的目标就是fd.Sysfd。回忆上一章中newFD方法,此处的Sysfd就是创建的系统socket的fd。
由于整个net包中不管是监听socket还是主动connect成功的socket还是accept建立的socket,都是使用netFD类进行包装,所以最好记住这个层级关系:
netFD对poll.FD进行包装,poll.FD对系统fd进行包装

介绍一下keepalive的三个内核参数:

  1. tcp_keepalive_time, 如果在该参数指定的秒数内连接始终处于空闲状态(没有收到远程主机的数据,ack不算),则内核向远程主机发起对该主机的探测
  2. tcp_keepalive_intvl,该参数以秒为单位,规定内核向远程主机发送探测的时间间隔
  3. tcp_keepalive_probes,该参数规定内核为了检测远程主机的存活而发送的探测的数量,如果探测的数量已经使用完毕仍旧没有得到响应,即断定不可达,关闭与该主机的连接,释放相关资源

setKeepAlive方法中的SO_KEEPALIVE则是设置keepalive的总开关,setKeepAlivePeriod中的TCP_KEEPINTVL对应tcp_keepalive_intvl参数,TCP_KEEPIDLE对应tcp_keepalive_time参数。TCP_KEEPCNT对应tcp_keepalive_probes,但是代码中没有搜索到使用的地方。

让我们回到accept的主流程,先跟一下ln.fd.accept方法调用:

3.netFD的accept方法

src/net/fd_unix.go

func (fd *netFD) accept() (netfd *netFD, err error) {
    d, rsa, errcall, err := fd.pfd.Accept()
    ...

    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    ...
    return netfd, nil
}

pfd.Accept即poll.FD的Accept方法,代码如下:

src/internal/poll/fd_unix.go

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
    for {
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        ...
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
       ...
        }
        return -1, nil, errcall, err
    }
}

内部方法accept代码如下:

src/internal/poll/sock_cloexec.go

func accept(s int) (int, syscall.Sockaddr, string, error) {
    ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
    
    switch err {
    case nil:
        return ns, sa, "", nil
    ...
    }

    ...
}

Accept4Func同样是一个系统调用方法:var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4,参数s是socket的fd,SOCK_NONBLOCK|SOCK_CLOEXEC则是设置新连接socket的属性。连接成功返回新连接socket的fd和主机地址信息。

我们再返回到poll.FD的Accept方法,accept返回后,如果没有错误就返回新连接socket的fd和主机地址信息。如果错误是EAGAIN(socket被设置为非阻塞模式,在这个socket上的系统调用都会立即返回而不会阻塞线程,例如此处的accept调用,即使没有新的连接,也会立即返回,但是错误信息会被设置为EAGAIN),并且fd.pd.pollable为true时,阻塞当前goroutine进行等待,直到有新的可读消息(此处为有新连接)时continue,再次调用accept进行接收连接。

这里提前简单说一下pollDesc(即FD中的pd),它是IO多路复用(如epoll、kqueue、CompletionPort等)在go语言中的集成,fd.pd.waitRead 即是等待io消息的到来。后续将有单独章节介绍epoll在go语言网络库中的使用。

type FD struct {
    ...
    // I/O poller.
    pd pollDesc
    ...
}

poll.FD的Accept方法返回到netFD的accept方法中,接着调用了newFD创建了netFD,newFD方法在上一章已经介绍,不再赘述。

到目前为止,整个调用链路基本讲完了,我们现在通过下面这张图回顾一下:

  1. TCPListener的accept方法调用netFD的accept方法,返回成功后,调用newTCPConn构建连接对象,并设置连接的keepalive属性
  2. netFD的accept方法调用poll.FD的Accept方法,返回成功后,调用newFD创建新socket的netFD对象
  3. poll.FD的Accept方法进行accept系统调用,如果有新连接建立成功则返回新连接socket的fd,如果遇到EAGAIN错误,则阻塞当前goroutine进行IO消息等待。

4. newTCPConn

src/net/tcp_sock.go

func newTCPConn(fd *netFD) *TCPConn {
    c := &TCPConn{conn{fd}}
    setNoDelay(c.fd, true)
    return c
}

conn是对接口类型Conn的实现,conn的唯一属性则是我们前面一直提到的netFD,conn的核心方法都是对netFD方法的包装:

src/net/net.go

type Conn interface {
    Read(b []byte) (n int, err error)
    Write(b []byte) (n int, err error)
    Close() error
    LocalAddr() Addr
    RemoteAddr() Addr
    SetDeadline(t time.Time) error
    SetReadDeadline(t time.Time) error
    SetWriteDeadline(t time.Time) error
}

type conn struct {
    fd *netFD
}

TCPConn继承自conn,它比较独特的一个方法就是ReadFrom,用来从一个Reader中读取数据并写入到TCPConn的socket上:

src/net/tcpsock.go

type TCPConn struct {
    conn
}

// ReadFrom implements the io.ReaderFrom ReadFrom method.
func (c *TCPConn) ReadFrom(r io.Reader) (int64, error) {
    ...
    n, err := c.readFrom(r)
    ...
    return n, err
}

src/net/tcpsock_posix.go

func (c *TCPConn) readFrom(r io.Reader) (int64, error) {
    if n, err, handled := splice(c.fd, r); handled {
        return n, err
    }
    if n, err, handled := sendFile(c.fd, r); handled {
        return n, err
    }
    return genericReadFrom(c, r)
}

可以看到readFrom进行了两种读取并写入的尝试,这两种方式都是为了减少用户空间到内核空间的数据拷贝:

  1. splice方式,通过建立一个临时的pipe,将输入splice至pipe,再将pipe splice至输出。这里要求Reader必须是tcp或者unix连接
  2. send file方式,通过sendFile系统调用,将Reader中数据高效地传输到socket上。这里要求Reader必须是文件
  3. genericReadFrom,回归到最原始的数据拷贝方式

如果我们需要向socket写入数据并且数据源实现了Reader接口的话,我们可以选择使用ReadFrom方法来提高性能。

5. 小结

今天通过跟踪TCPListener的Accept方法,了解了server侧接收到新连接的过程。总结为以下几个点:

  1. TCPConn继承自conn,conn对netFD进行包装并实现了Conn接口,netFD对poll.FD进行包装,poll.FD对系统fd进行包装
  2. keepalive设置
  3. 避免用户空间到内核空间的数据拷贝的两种方式:splice和sendfile

下一章我们将对TCPConn的Read方法进行跟踪,来了解数据读取的过程。