欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

详解在Node.js中搭建Net模块的网络连接过程

最编程 2024-07-24 11:51:56
...

Node.js中提供了net模块,该模块提供了对TCP、Socket的封装与支持,它包含了创建TCP服务器/客户端的方法。net模块继承自events和stream模块,所以该模块创建的服务器/客户端也是一个事件发射器,而其创建的客户端socket套接字对象又是一个可读写的Stream。net模块网络操作的基础模块,Node.js中其它网络操作相关模块,如:Http模块等,都是基于net模块的进一步封装。

net模块主要实现代码在lib/net.js中
这里分析客户端创建socket建立TCP连接来分析nodejs中相应源码
客户端测试代码:

var net = require('net');  //引入网络模块
var HOST = '127.0.0.1';  //定义服务器端的地址
var PORT = 5000;  //定义服务器端口号

var client = net.createConnection(PORT, HOST);

client.on('connect', function(){
      console.log('成功建立连接');
});  

socket的创建和建立连接

在nodejs中socket的创建和connect建立连接集成在一个用户可见的函数接口中

net.createConnection(options[, connectListener])

其中connectListener将被添加为返回socket上的connect事件上的监视器,使用net.createConnection函数会进入到net模块中,在net.js代码里

/*lib/net.js*/
module.exports = {
  _createServerHandle: createServerHandle,
  _normalizeArgs: normalizeArgs,
  _setSimultaneousAccepts,
  get BlockList() {
    BlockList ??= require('internal/blocklist').BlockList;
    return BlockList;
  },
  get SocketAddress() {
    SocketAddress ??= require('internal/socketaddress').SocketAddress;
    return SocketAddress;
  },
  connect,
  createConnection: connect,     /*here*/
  createServer,
  isIP: isIP,
  isIPv4: isIPv4,
  isIPv6: isIPv6,
  Server,
  Socket,
  Stream: Socket, // Legacy naming
};

可以看到createConnection被挂接在connect函数上

/*lib/net.js    connect构造函数*/
function connect(...args) {
  const normalized = normalizeArgs(args);  /*获取传进来的参数*/
  const options = normalized[0];
  debug('createConnection', normalized);
  const socket = new Socket(options);  /*创建socket*/

  if (options.timeout) {
    socket.setTimeout(options.timeout);
  }

  return socket.connect(normalized);  /*建立连接*/
}

可见connect函数中主要干了两件事,一件是创建socket,另一件就是建立连接

创建socket

new Socket(options)的实现如下所示:

/*lib/net.js */
function Socket(options) {
  if (!(this instanceof Socket)) return new Socket(options);

  this.connecting = false;
  // Problem with this is that users can supply their own handle, that may not
  // have _handle.getAsyncId(). In this case an[async_id_symbol] should
  // probably be supplied by async_hooks.
  this[async_id_symbol] = -1;
  this._hadError = false;
  this[kHandle] = null;
  this._parent = null;
  this._host = null;
  this[kSetNoDelay] = false;
  this[kLastWriteQueueSize] = 0;
  this[kTimeout] = null;
  this[kBuffer] = null;
  this[kBufferCb] = null;
  this[kBufferGen] = null;

  if (typeof options === 'number')
    options = { fd: options }; // Legacy interface.
  else
    options = { ...options };

  // Default to *not* allowing half open sockets.
  options.allowHalfOpen = Boolean(options.allowHalfOpen);
  // For backwards compat do not emit close on destroy.
  options.emitClose = false;
  options.autoDestroy = true;
  // Handle strings directly.
  options.decodeStrings = false;
  ReflectApply(stream.Duplex, this, [options]);

  if (options.handle) {
    this._handle = options.handle; // private
    this[async_id_symbol] = getNewAsyncId(this._handle);
  } else if (options.fd !== undefined) {
    const { fd } = options;
    let err;

    // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
    // a valid `PIPE` or `TCP` descriptor
    /*为_handle属性赋值为TCP实例*/
    this._handle = createHandle(fd, false);

    err = this._handle.open(fd);

    // While difficult to fabricate, in some architectures
    // `open` may return an error code for valid file descriptors
    // which cannot be opened. This is difficult to test as most
    // un-openable fds will throw on `createHandle`
    if (err)
      throw errnoException(err, 'open');

    this[async_id_symbol] = this._handle.getAsyncId();

    if ((fd === 1 || fd === 2) &&
        (this._handle instanceof Pipe) && isWindows) {
      // Make stdout and stderr blocking on Windows
      err = this._handle.setBlocking(true);
      if (err)
        throw errnoException(err, 'setBlocking');

      this._writev = null;
      this._write = makeSyncWrite(fd);
      // makeSyncWrite adjusts this value like the original handle would, so
      // we need to let it do that by turning it into a writable, own
      // property.
      ObjectDefineProperty(this._handle, 'bytesWritten', {
        value: 0, writable: true
      });
    }
  }

  const onread = options.onread;
  if (onread !== null && typeof onread === 'object' &&
      (isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
      typeof onread.callback === 'function') {
    if (typeof onread.buffer === 'function') {
      this[kBuffer] = true;
      this[kBufferGen] = onread.buffer;
    } else {
      this[kBuffer] = onread.buffer;
    }
    this[kBufferCb] = onread.callback;
  }

  // Shut down the socket when we're finished with it.
  this.on('end', onReadableStreamEnd);

  initSocketHandle(this);

  this._pendingData = null;
  this._pendingEncoding = '';

  // If we have a handle, then start the flow of data into the
  // buffer.  if not, then this will happen when we connect
  if (this._handle && options.readable !== false) {
    if (options.pauseOnCreate) {
      // Stop the handle from reading and pause the stream
      this._handle.reading = false;
      this._handle.readStop();
      this.readableFlowing = false;
    } else if (!options.manualStart) {
      this.read(0);
    }
  }

  // Reserve properties
  this.server = null;
  this._server = null;

  // Used after `.destroy()`
  this[kBytesRead] = 0;
  this[kBytesWritten] = 0;
}

其中this._handle = createHandle(fd, false);创建了TCP实例,进入到createHandle函数中

/*lib/net.js*/
function createHandle(fd, is_server) {
  validateInt32(fd, 'fd', 0);
  const type = guessHandleType(fd);
  if (type === 'PIPE') {
    return new Pipe(
      is_server ? PipeConstants.SERVER : PipeConstants.SOCKET
    );
  }

  if (type === 'TCP') {
    /*创建TCP实例*/
    return new TCP(
      is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
    );
  }

  throw new ERR_INVALID_FD_TYPE(type);
}

其中通过new TCP()来创建TCP实例,传入的is_server参数为false,TCP()构造函数中的参数为TCPConstants.SOCKET

/*lib/net.js*/
const {
  TCP,
  TCPConnectWrap,
  constants: TCPConstants
} = internalBinding('tcp_wrap');
/*src/tcp_wrap.cc*/
TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
    : ConnectionWrap(env, object, provider) {
  int r = uv_tcp_init(env->event_loop(), &handle_);
  CHECK_EQ(r, 0);  // How do we proxy this error up to javascript?
                   // Suggestion: uv_tcp_init() returns void.
}

从上方代码中可以看到一步步调用,最终会调用uv_tcp_init创建socket

/*deps/uv/src/unix/tcp.c*/
/*AF_UNSPEC意味着函数返回的是适用于指定主机名和服务名且适合任何协议族的地址。*/
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
  return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);
}
/*deps/uv/src/unix/tcp.c*/
/*初始化,将结构体中的一些变量置为0,NULL之类*/
int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
  int domain;

  /* Use the lower 8 bits for the domain */
  domain = flags & 0xFF;
  /*只能是以下三种中的一种,其中AF_UNSPEC并不会做任何操作*/
  if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
    return UV_EINVAL;

  if (flags & ~0xFF) /*高八位有值,返回错误信息*/
    return UV_EINVAL;
  /*初始化stream,handle加入loop的handle列表*/
  uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);

  /* If anything fails beyond this point we need to remove the handle from
   * the handle queue, since it was added by uv__handle_init in uv_stream_init.
   */

  if (domain != AF_UNSPEC) {
    int err = maybe_new_socket(tcp, domain, 0);
    if (err) {
      QUEUE_REMOVE(&tcp->handle_queue);
      return err;
    }
  }

  return 0;
}

建立连接connect

/*lib/net.js*/
Socket.prototype.connect = function(...args) {
  let normalized;
  // If passed an array, it's treated as an array of arguments that have
  // already been normalized (so we don't normalize more than once). This has
  // been solved before in https://github.com/nodejs/node/pull/12342, but was
  // reverted as it had unintended side effects.
  if (ArrayIsArray(args[0]) && args[0][normalizedArgsSymbol]) {
    normalized = args[0];
  } else {
    normalized = normalizeArgs(args);
  }
  const options = normalized[0];
  const cb = normalized[1];

  // options.port === null will be checked later.
  if (options.port === undefined && options.path == null)
    throw new ERR_MISSING_ARGS(['options', 'port', 'path']);

  if (this.write !== Socket.prototype.write)
    this.write = Socket.prototype.write;

  if (this.destroyed) {
    this._handle = null;
    this._peername = null;
    this._sockname = null;
  }

  const { path } = options;
  const pipe = !!path;
  debug('pipe', pipe, path);

  if (!this._handle) {
    this._handle = pipe ?
      new Pipe(PipeConstants.SOCKET) :
      new TCP(TCPConstants.SOCKET);
    initSocketHandle(this);
  }

  if (cb !== null) {
    this.once('connect', cb);
  }

  this._unrefTimer();

  this.connecting = true;

  if (pipe) {
    validateString(path, 'options.path');
    defaultTriggerAsyncIdScope(
      this[async_id_symbol], internalConnect, this, path
    );
  } else {
    lookupAndConnect(this, options);  /*核心处理函数*/
  }
  return this;
};
/*lib/net.js*/
function lookupAndConnect(self, options) {
  const { localAddress, localPort } = options;
  const host = options.host || 'localhost';
  let { port } = options;

  if (localAddress && !isIP(localAddress)) {
    throw new ERR_INVALID_IP_ADDRESS(localAddress);
  }

  if (localPort) {
    validateNumber(localPort, 'options.localPort');
  }

  if (typeof port !== 'undefined') {
    if (typeof port !== 'number' && typeof port !== 'string') {
      throw new ERR_INVALID_ARG_TYPE('options.port',
                                     ['number', 'string'], port);
    }
    validatePort(port);
  }
  port |= 0;

  // If host is an IP, skip performing a lookup
  const addressType = isIP(host);
  if (addressType) {
    defaultTriggerAsyncIdScope(self[async_id_symbol], process.nextTick, () => {
      if (self.connecting)
        defaultTriggerAsyncIdScope(
          self[async_id_symbol],
          internalConnect,
          self, host, port, addressType, localAddress, localPort
        );
    });
    return;
  }

  if (options.lookup && typeof options.lookup !== 'function')
    throw new ERR_INVALID_ARG_TYPE('options.lookup',
                                   'Function', options.lookup);


  if (dns === undefined) dns = require('dns');
  const dnsopts = {
    family: options.family,
    hints: options.hints || 0
  };

  if (!isWindows &&
      dnsopts.family !== 4 &&
      dnsopts.family !== 6 &&
      dnsopts.hints === 0) {
    dnsopts.hints = dns.ADDRCONFIG;
  }

  debug('connect: find host', host);
  debug('connect: dns options', dnsopts);
  self._host = host;
  const lookup = options.lookup || dns.lookup;
  defaultTriggerAsyncIdScope(self[async_id_symbol], function() {
    lookup(host, dnsopts, function emitLookup(err, ip, addressType) {
      self.emit('lookup', err, ip, addressType, host);

      // It's possible we were destroyed while looking this up.
      // XXX it would be great if we could cancel the promise returned by
      // the look up.
      if (!self.connecting) return;

      if (err) {
        // net.createConnection() creates a net.Socket object and immediately
        // calls net.Socket.connect() on it (that's us). There are no event
        // listeners registered yet so defer the error event to the next tick.
        process.nextTick(connectErrorNT, self, err);
      } else if (!isIP(ip)) {
        err = new ERR_INVALID_IP_ADDRESS(ip);
        process.nextTick(connectErrorNT, self, err);
      } else if (addressType !== 4 && addressType !== 6) {
        err = new ERR_INVALID_ADDRESS_FAMILY(addressType,
                                             options.host,
                                             options.port);
        process.nextTick(connectErrorNT, self, err);
      } else {
        self._unrefTimer();
        defaultTriggerAsyncIdScope(
          self[async_id_symbol],
          internalConnect,
          self, ip, port, addressType, localAddress, localPort
        );
      }
    });
  });
}

此函数中对地址族的类型进行判断,进入到internalConnect函数中

/*lib/net.js*/
function internalConnect(
  self, address, port, addressType, localAddress, localPort, flags) {
  // TODO return promise from Socket.prototype.connect which
  // wraps _connectReq.

  assert(self.connecting);

  let err;

  if (localAddress || localPort) {
    if (addressType === 4) {
      localAddress = localAddress || DEFAULT_IPV4_ADDR;
      err = self._handle.bind(localAddress, localPort);
    } else { // addressType === 6
      localAddress = localAddress || DEFAULT_IPV6_ADDR;
      err = self._handle.bind6(localAddress, localPort, flags);
    }
    debug('binding to localAddress: %s and localPort: %d (addressType: %d)',
          localAddress, localPort, addressType);

    err = checkBindError(err, localPort, self._handle);
    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', localAddress, localPort);
      self.destroy(ex);
      return;
    }
  }

  if (addressType === 6 || addressType === 4) {
    const req = new TCPConnectWrap();
    req.oncomplete = afterConnect;
    req.address = address;
    req.port = port;
    req.localAddress = localAddress;
    req.localPort = localPort;

    if (addressType === 4)
      err = self._handle.connect(req, address, port);
    else
      err = self._handle.connect6(req, address, port);
  } else {
    const req = new PipeConnectWrap();
    req.address = address;
    req.oncomplete = afterConnect;

    err = self._handle.connect(req, address, afterConnect);
  }

  if (err) {
    const sockname = self._getsockname();
    let details;

    if (sockname) {
      details = sockname.address + ':' + sockname.port;
    }

    const ex = exceptionWithHostPort(err, 'connect', address, port, details);
    self.destroy(ex);
  }
}

在此函数中首先对地址进行绑定bind,然后建立连接connect,根据地址族的不同调用不同的处理函数,不同的处理函数的挂接在

/*src/tcp_wrap.cc*/
void TCPWrap::Initialize(Local<Object> target,
                         Local<Value> unused,
                         Local<Context> context,
                         void* priv) {
  Environment* env = Environment::GetCurrent(context);

  Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
  t->InstanceTemplate()->SetInternalFieldCount(StreamBase::kInternalFieldCount);

  // Init properties
  t->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "reading"),
                             Boolean::New(env->isolate(), false));
  t->InstanceTemplate()->Set(env->owner_symbol(), Null(env->isolate()));
  t->InstanceTemplate()->Set(env->onconnection_string(), Null(env->isolate()));

  t->Inherit(LibuvStreamWrap::GetConstructorTemplate(env));

  env->SetProtoMethod(t, "open", Open);
  env->SetProtoMethod(t, "bind", Bind);
  env->SetProtoMethod(t, "listen", Listen);
  env->SetProtoMethod(t, "connect", Connect);
  env->SetProtoMethod(t, "bind6", Bind6);
  env->SetProtoMethod(t, "connect6", Connect6);
  env->SetProtoMethod(t, "getsockname",
                      GetSockOrPeerName<TCPWrap, uv_tcp_getsockname>);
  env->SetProtoMethod(t, "getpeername",
                      GetSockOrPeerName<TCPWrap, uv_tcp_getpeername>);
  env->SetProtoMethod(t, "setNoDelay", SetNoDelay);
  env->SetProtoMethod(t, "setKeepAlive", SetKeepAlive);

#ifdef _WIN32
  env->SetProtoMethod(t, "setSimultaneousAccepts", SetSimultaneousAccepts);
#endif

  env->SetConstructorFunction(target, "TCP", t);
  env->set_tcp_constructor_template(t);

  // Create FunctionTemplate for TCPConnectWrap.
  Local<FunctionTemplate> cwt =
      BaseObject::MakeLazilyInitializedJSTemplate(env);
  cwt->Inherit(AsyncWrap::GetConstructorTemplate(env));
  env->SetConstructorFunction(target, "TCPConnectWrap", cwt);

  // Define constants
  Local<Object> constants = Object::New(env->isolate());
  NODE_DEFINE_CONSTANT(constants, SOCKET);
  NODE_DEFINE_CONSTANT(constants, SERVER);
  NODE_DEFINE_CONSTANT(constants, UV_TCP_IPV6ONLY);
  target->Set(context,
              env->constants_string(),
              constants).Check();
}

上面代码中可以看出,bind和bind6分别挂接在Bind和Bind6函数中

/*src/tcp_wrap.cc*/
template <typename T>
void TCPWrap::Bind(
    const FunctionCallbackInfo<Value>& args,
    int family,
    std::function<int(const char* ip_address, int port, T* addr)> uv_ip_addr) {
  TCPWrap* wrap;
  ASSIGN_OR_RETURN_UNWRAP(&wrap,
                          args.Holder(),
                          args.GetReturnValue().Set(UV_EBADF));
  Environment* env = wrap->env();
  node::Utf8Value ip_address(env->isolate(), args[0]);
  int port;
  unsigned int flags = 0;
  if (!args[1]->Int32Value(env->context()).To(&port)) return;
  if (family == AF_INET6 &&
      !args[2]->Uint32Value(env->context()).To(&flags)) {
    return;
  }

  T addr;
  int err = uv_ip_addr(*ip_address, port, &addr)
;
  if (err == 0) {
    err = uv_tcp_bind(&wrap->handle_,
                      reinterpret_cast<const sockaddr*>(&addr),
                      flags);
  }
  args.GetReturnValue().Set(err);
}

void TCPWrap::Bind(const FunctionCallbackInfo<Value>& args) {
  Bind<sockaddr_in>(args, AF_INET, uv_ip4_addr);
}


void TCPWrap::Bind6(const FunctionCallbackInfo<Value>& args) {
  Bind<sockaddr_in6>(args, AF_INET6, uv_ip6_addr);
}

从上面代码中可以看出Bind和Bind6函数最终都调用Bind构造函数来实现,在Bind构造函数中,最终调用uv_tcp_bind函数

/*deps/uv/src/uv-common.c*/
int uv_tcp_bind(uv_tcp_t* handle,
                const struct sockaddr* addr,
                unsigned int flags) {
  unsigned int addrlen;

  if (handle->type != UV_TCP)
    return UV_EINVAL;

  if (addr->sa_family == AF_INET)
    addrlen = sizeof(struct sockaddr_in);
  else if (addr->sa_family == AF_INET6)
    addrlen = sizeof(struct sockaddr_in6);
  else
    return UV_EINVAL;

  return uv__tcp_bind(handle, addr, addrlen, flags);
}

此函数中对协议族进行判断然后赋值addrlen相应的结构体的长度,然后调用uv__tcp_bind函数

/*deps/uv/src/unix/tcp.c*/
int uv__tcp_bind(uv_tcp_t* tcp,
                 const struct sockaddr* addr,
                 unsigned int addrlen,
                 unsigned int flags) {
  int err;
  int on;

  /* Cannot set IPv6-only mode on non-IPv6 socket. */
  if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
    return UV_EINVAL;

  err = maybe_new_socket(tcp, addr->sa_family, 0);
  if (err)
    return err;

  on = 1;
  if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
    return UV__ERR(errno);

#ifndef __OpenBSD__
#ifdef IPV6_V6ONLY
  if (addr->sa_family == AF_INET6) {
    on = (flags & UV_TCP_IPV6ONLY) != 0;
    if (setsockopt(tcp->io_watcher.fd,
                   IPPROTO_IPV6,
                   IPV6_V6ONLY,
                   &on,
                   sizeof on) == -1) {
#if defined(__MVS__)
      if (errno == EOPNOTSUPP)
        return UV_EINVAL;
#endif
      return UV__ERR(errno);
    }
  }
#endif
#endif

  errno = 0;
  if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) {
    if (errno == EAFNOSUPPORT)
      /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
       * socket created with AF_INET to an AF_INET6 address or vice versa. */
      return UV_EINVAL;
    return UV__ERR(errno);
  }
  tcp->delayed_error = UV__ERR(errno);

  tcp->flags |= UV_HANDLE_BOUND;
  if (addr->sa_family == AF_INET6)
    tcp->flags |= UV_HANDLE_IPV6;

  return 0;
}

注意在此函数中会通过maybe_new_socket函数来判断是否已经创建了socket,有则获取到socket描述符放入到TCP实例中,没有则创建socket

/*deps/uv/src/unix/tcp.c*/
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
  struct sockaddr_storage saddr;
  socklen_t slen;

  if (domain == AF_UNSPEC) {
    handle->flags |= flags;
    return 0;
  }

  if (uv__stream_fd(handle) != -1) {

    if (flags & UV_HANDLE_BOUND) {

      if (handle->flags & UV_HANDLE_BOUND) {
        /* It is already bound to a port. */
        handle->flags |= flags;
        return 0;
      }

      /* Query to see if tcp socket is bound. */
      slen = sizeof(saddr);
      memset(&saddr, 0, sizeof(saddr));
      if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen))
        return UV__ERR(errno);

      if ((saddr.ss_family == AF_INET6 &&
          ((struct sockaddr_in6*) &saddr)->sin6_port != 0) ||
          (saddr.ss_family == AF_INET &&
          ((struct sockaddr_in*) &saddr)->sin_port != 0)) {
        /* Handle is already bound to a port. */
        handle->flags |= flags;
        return 0;
      }

      /* Bind to arbitrary port */
      if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen))
        return UV__ERR(errno);
    }

    handle->flags |= flags;
    return 0;
  }

  return new_socket(handle, domain, flags);  /*创建socket*/
}

从上面代码可以看出来,判断此socket是否已经创建是通过uv__stream_fd(handle)是否等于-1,uv__stream_fd(handle)函数

#define uv__stream_fd(handle) ((handle)->io_watcher.fd)

可以看出终究是通过判断io_watcher.fd是否存在来判断socket是否创建成功
创建socket函数:

/*deps/uv/src/unix/tcp.c*/
/*在libuv申请一个socket的逻辑,他还支持新建的socket,可以绑定到一个用户设置的,
或者操作系统随机选择的地址。不过libuv并不直接使用这个函数。而是又封装了一层:maybe_new_socket*/
/*
1 获取一个新的socket fd
2 把fd保存到handle里,并根据flag进行相关设置
3 绑定到本机随意的地址(如果设置了该标记的话)
*/
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
  struct sockaddr_storage saddr;
  socklen_t slen;
  int sockfd;
  int err;

  // 获取一个socket
  err = uv__socket(domain, SOCK_STREAM, 0);
  printf("came into new_socket, domain:%d\n",domain);
  printf("%d\n",err);
  if (err < 0)
    return err;
  // 申请的fd
  sockfd = err;
  // 设置选项和保存socket的文件描述符到io观察者中
  err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
  printf("after uv__stream_open,err:%d\n",err);
  if (err) {
    uv__close(sockfd);
    return err;
  }

  // 设置了需要绑定标记UV_HANDLE_BOUND   
  if (flags & UV_HANDLE_BOUND) {
    /* Bind this new socket to an arbitrary port */
    slen = sizeof(saddr);
    memset(&saddr, 0, sizeof(saddr));
    // 获取fd对应的socket信息,比如ip,端口,可能没有
    if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen)) {
      uv__close(sockfd);
      return UV__ERR(errno);
    }

    printf("%s:socketfd:%d, addrlen:%d\n",__func__,uv__stream_fd(handle),slen);
    // 绑定到socket中,如果没有则绑定到系统随机选择的地址
    if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen)) {
      printf("bind wrong\n");
      uv__close(sockfd);
      return UV__ERR(errno);
    }
  }

  return 0;
}
/*deps/uv/src/unix/core.c*/
int uv__socket(int domain, int type, int protocol) {
  int sockfd;
  int err;

#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
  sockfd = socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
  printf("%s: create socket with SOCK_NONBLOCK and SOCK_CLOEXEC",__func__);
  if (sockfd != -1)
    return sockfd;

  if (errno != EINVAL)
    return UV__ERR(errno);
#endif

  sockfd = socket(domain, type, protocol);
  printf("%s: create socket",__func__);
  if (sockfd == -1)
    return UV__ERR(errno);

  err = uv__nonblock(sockfd, 1);
  if (err == 0)
    err = uv__cloexec(sockfd, 1);

  if (err) {
    uv__close(sockfd);
    return err;
  }

#if defined(SO_NOSIGPIPE)
  {
    int on = 1;
    setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on));
  }
#endif

  return sockfd;
}

上面所示uv__socket函数中我们终于看到了socket系统调用,其中如果定义非阻塞flag,则创建非阻塞socket,否则创建普通socket。
socket创建完成,让我们回到internalConnect里面

/*lib/net.js*/
function internalConnect(
  self, address, port, addressType, localAddress, localPort, flags) {
......
  if (addressType === 6 || addressType === 4) {
    const req = new TCPConnectWrap();
    req.oncomplete = afterConnect;
    req.address = address;
    req.port = port;
    req.localAddress = localAddress;
    req.localPort = localPort;

    if (addressType === 4)
      err = self._handle.connect(req, address, port);
    else
      err = self._handle.connect6(req, address, port);
  } else {
    const req = new PipeConnectWrap();
    req.address = address;
    req.oncomplete = afterConnect;

    err = self._handle.connect(req, address, afterConnect);
  }

  if (err) {
    const sockname = self._getsockname();
    let details;

    if (sockname) {
      details = sockname.address + ':' + sockname.port;
    }

    const ex = exceptionWithHostPort(err, 'connect', address, port, details);
    self.destroy(ex);
  }
}

判断地址类型然后调用self._handle.connect,这个在TCPWrap::Initialize中指定相应的处理函数,最终会调用uv__tcp_connect

/*deps/uv/src/unix/tcp.c*/
int uv__tcp_connect(uv_connect_t* req,
                    uv_tcp_t* handle,
                    const struct sockaddr* addr,
                    unsigned int addrlen,
                    uv_connect_cb cb) {
  int err;
  int r;
  printf("%s: came to uv__tcp_connect.\n",__func__);
  assert(handle->type == UV_TCP);
  // 已经发起了connect了
  if (handle->connect_req != NULL)
    return UV_EALREADY;  /* FIXME(bnoordhuis) UV_EINVAL or maybe UV_EBUSY. */

  if (handle->delayed_error != 0)
    goto out;
  // 申请一个socket和绑定一个地址,如果还没有的话
  err = maybe_new_socket(handle,
                         addr->sa_family,
                         UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
  printf("%s: after maybe_new_socket, err:%d\n",__func__,err);
  printf("socketfd:%d, addrlen:%d\n",uv__stream_fd(handle),addrlen);

  if (err)
    return err;

  do {
    // 清除全局错误变量的值
    errno = 0;
    /*三次握手,非阻塞调用*/
    r = connect(uv__stream_fd(handle), addr, addrlen);
    printf("r:%d\n",r);
  } while (r == -1 && errno == EINTR);

  printf("%s: break do while, r=%d, errno=%d\n",__func__,r, errno);

  /* We not only check the return value, but also check the errno != 0.
   * Because in rare cases connect() will return -1 but the errno
   * is 0 (for example, on Android 4.3, OnePlus phone A0001_12_150227)
   * and actually the tcp three-way handshake is completed.
   */
  /*连接错误,判断错误码*/
  if (r == -1 && errno != 0) {
    /*在处在连接中,不是错误,等待连接完成,事件变成可读*/
    if (errno == EINPROGRESS)
      ; /* not an error */
    else if (errno == ECONNREFUSED
#if defined(__OpenBSD__)
      || errno == EINVAL
#endif
      )  /*连接被拒绝*/
    /* If we get ECONNREFUSED (Solaris) or EINVAL (OpenBSD) wait until the
     * next tick to report the error. Solaris and OpenBSD wants to report
     * immediately -- other unixes want to wait.
     */
      handle->delayed_error = UV__ERR(ECONNREFUSED);
    else
      return UV__ERR(errno);
  }

out:
  // 初始化一个连接型request,并设置某些字段
  uv__req_init(handle->loop, req, UV_CONNECT);
  req->cb = cb;
  req->handle = (uv_stream_t*) handle;
  QUEUE_INIT(&req->queue);
  /*挂载到handle,等待可写事件*/
  handle->connect_req = req;
  if(handle->connect_req == NULL){
    printf("%s: handle->connect_req is NULL!\n");
  }
  // 注册到libuv观察者队列
  uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
  // 连接出错,插入pending队尾
  if (handle->delayed_error)
    uv__io_feed(handle->loop, &handle->io_watcher);

  return 0;
}

从上述代码中我们能看到connect系统调用,然后将此socket的文件描述符加入到libuv观察者队列中,libuv观察者会观测三次握手是否完成,若三次握手完成,则内核会给出一个信号,libuv观察者会检测到此信号,然后调用连接完成处理函数,这里是afterConnect

/*lib/net.js*/
function afterConnect(status, handle, req, readable, writable) {
  const self = handle[owner_symbol];
  console.log('net.js: came into afterConnect');
  console.log('status: %d', status);

  // Callback may come after call to destroy
  if (self.destroyed) {
    return;
  }

  debug('afterConnect');

  assert(self.connecting);
  self.connecting = false;
  self._sockname = null;

  if (status === 0) {
    if (self.readable && !readable) {
      self.push(null);
      self.read();
    }
    if (self.writable && !writable) {
      self.end();
    }
    self._unrefTimer();

    self.emit('connect');
    self.emit('ready');

    // Start the first read, or get an immediate EOF.
    // this doesn't actually consume any bytes, because len=0.
    if (readable && !self.isPaused())
      self.read(0);

  } else {
    self.connecting = false;
    let details;
    if (req.localAddress && req.localPort) {
      details = req.localAddress + ':' + req.localPort;
    }
    const ex = exceptionWithHostPort(status,
                                     'connect',
                                     req.address,
                                     req.port,
                                     details);
    if (details) {
      ex.localAddress = req.localAddress;
      ex.localPort = req.localPort;
    }
    self.destroy(ex);
  }
}

其中status为0则表示没有出现什么错误,然后我们会发现self.emit('connect');这句话,这句话就是我们nodejs中测试函数中.on("connect")的处理,又一次的回调,最终用户端会输出“成功建立连接”。

推荐阅读