【推荐学习:《nodejs 教程》】
本篇例子来源:http://docs.libuv.org/en/v1.x/guide/networking.html
涉及的知识点
libuv 中网络的实现libuv 解决 accept (emfile错误)bsd 套接字sockaddr_inunix 域协议使用! 在进程间传递“文件描述符”例子 tcp-echo-server/main.clibuv 异步使用 bsd 套接字 的例子
libuv 中的网络和直接使用 bsd 套接字接口没有什么不同,有些事情更简单,都是无阻塞的,但概念都是一样的。此外,libuv 还提供了一些实用的函数来抽象出那些烦人的、重复的、低级的任务,比如使用bsd套接字结构设置套接字、dns查询以及调整各种套接字参数。
int main() { loop = uv_default_loop(); uv_tcp_t server; uv_tcp_init(loop, &server); uv_ip4_addr("0.0.0.0", default_port, &addr); uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0); int r = uv_listen((uv_stream_t*) &server, default_backlog, on_new_connection); if (r) { fprintf(stderr, "listen error %s\n", uv_strerror(r)); return 1; } return uv_run(loop, uv_run_default);}void on_new_connection(uv_stream_t *server, int status) { if (status < 0) { fprintf(stderr, "new connection error %s\n", uv_strerror(status)); // error! return; } uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); uv_tcp_init(loop, client); if (uv_accept(server, (uv_stream_t*) client) == 0) { uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);}
同步的例子这是一个正常同步使用 bsd 套接字 的例子。
作为参照可以发现主要有如下几步
首先调用 socket() 为通讯创建一个端点,为套接字返回一个文件描述符。
接着调用 bind() 为一个套接字分配地址。当使用 socket() 创建套接字后,只赋予其所使用的协议,并未分配地址。在接受其它主机的连接前,必须先调用 bind() 为套接字分配一个地址。
当 socket 和一个地址绑定之后,再调用 listen() 函数会开始监听可能的连接请求。
最后调用 accept, 当应用程序监听来自其他主机的面对数据流的连接时,通过事件(比如unix select()系统调用)通知它。必须用 accept()函数初始化连接。 accept() 为每个连接创立新的套接字并从监听队列中移除这个连接。
int main(void) { struct sockaddr_in stsockaddr; int socketfd = socket(pf_inet, sock_stream, ipproto_tcp); if(-1 == socketfd) { perror("can not create socket"); exit(exit_failure); } memset(&stsockaddr, 0, sizeof(struct sockaddr_in)); stsockaddr.sin_family = af_inet; stsockaddr.sin_port = htons(1100); stsockaddr.sin_addr.s_addr = inaddr_any; if(-1 == bind(socketfd,(const struct sockaddr *)&stsockaddr, sizeof(struct sockaddr_in))) { perror("error bind failed"); close(socketfd); exit(exit_failure); } if(-1 == listen(socketfd, 10)) { perror("error listen failed"); close(socketfd); exit(exit_failure); } for(;;) { int connectfd = accept(socketfd, null, null); if(0 > connectfd) { perror("error accept failed"); close(socketfd); exit(exit_failure); } /* perform read write operations ... */ shutdown(connectfd, shut_rdwr); close(connectfd); } close(socketfd); return 0; }
uv_tcp_initmain > uv_tcp_init
1、对 domain 进行了验证, 需要是下面3种的一种
af_inet 表示 ipv4 网络协议af_inet6 表示 ipv6af_unspec 表示适用于指定主机名和服务名且适合任何协议族的地址2、tcp 也是一种流, 调用 uv__stream_init 对流数据进行初始化
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) { return uv_tcp_init_ex(loop, tcp, af_unspec);}int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) { int domain; /* use the lower 8 bits for the domain */ domain = flags & 0xff; if (domain != af_inet && domain != af_inet6 && domain != af_unspec) return uv_einval; if (flags & ~0xff) return uv_einval; uv__stream_init(loop, (uv_stream_t*)tcp, uv_tcp); ... return 0;}
uv__stream_initmain > uv_tcp_init > uv__stream_init
流的初始化函数使用的地方还是特别多的, 也特别重要。下述 i/o 的完整实现参考 【libuv 源码学习笔记】线程池与i/o
1、对流会被调用的回调函数等进行一个初始化
如 read_cb 函数, 在本例子中 on_new_connection > uv_read_start 函数就会真实的设置该 read_cb 为用户传入的参数 echo_read, 其被调用时机是该 stream 上设置的 io_watcher.fd 有数据写入时, 在事件循环阶段被 epoll 捕获后。alloc_cb 函数的调用过程同 read_cb, alloc 类型函数一般是设置当前需要读取的内容长度, 在流数据传输时通常首先会写入本次传输数据的长度, 然后是具体的内容, 主要是为了接收方能够合理的申请内存进行存储。如 grpc, thread-loader 中都有详细的应用。close_cb 函数被调用在 stream 数据结束时或者出错时。connection_cb 函数如本例子 tcp 流, 当 accept 接收到新连接时被调用。本例子中即为 on_new_connectionconnect_req 结构主要用于 tcp 客户端相关连接回调等数据的挂载使用。shutdown_req 结构主要用于流 destroy 时回调等数据的挂载使用。accepted_fd 当 accept 接收到新连接时, 存储 accept(socketfd, null, null) 返回的 connectfd。queued_fds 用于保存等待处理的连接, 其主要用于 node cluster 集群 的实现。// queued_fds1. 当收到其他进程通过 ipc 写入的数据时, 调用 uv__stream_recv_cmsg 函数2. uv__stream_recv_cmsg 函数读取到进程传递过来的 fd 引用, 调用 uv__stream_queue_fd 函数保存。3. queued_fds 被消费主要在 src/stream_wrap.cc libuvstreamwrap::onuvread > accepthandle 函数中。
2、其中专门为 loop->emfile_fd 通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符, 追踪发现原来是解决 accept (emfile错误), 下面我们讲 uv__accept 的时候再细说这个 loop->emfile_fd 的妙用。
accept处理连接时,若出现 emfile 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃
3、调用 uv__io_init 初始化的该 stream 的 i/o 观察者的回调函数为 uv__stream_io
void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type) { int err; uv__handle_init(loop, (uv_handle_t*)stream, type); stream->read_cb = null; stream->alloc_cb = null; stream->close_cb = null; stream->connection_cb = null; stream->connect_req = null; stream->shutdown_req = null; stream->accepted_fd = -1; stream->queued_fds = null; stream->delayed_error = 0; queue_init(&stream->write_queue); queue_init(&stream->write_completed_queue); stream->write_queue_size = 0; if (loop->emfile_fd == -1) { err = uv__open_cloexec("/dev/null", o_rdonly); if (err < 0) /* in the rare case that "/dev/null" isn't mounted open "/" * instead. */ err = uv__open_cloexec("/", o_rdonly); if (err >= 0) loop->emfile_fd = err; }#if defined(__apple__) stream->select = null;#endif /* defined(__apple_) */ uv__io_init(&stream->io_watcher, uv__stream_io, -1);}
uv__open_cloexecmain > uv_tcp_init > uv__stream_init > uv__open_cloexec
同步调用 open 方法拿到了 fd, 也许你会问为啥不像 【libuv 源码学习笔记】线程池与i/o 中调用 uv_fs_open 异步获取 fd, 其实 libuv 中并不全部是异步的实现, 比如当前的例子启动 tcp 服务前的一些初始化, 而不是用户请求过程中发生的任务, 同步也是能接受的。
int uv__open_cloexec(const char* path, int flags) {#if defined(o_cloexec) int fd; fd = open(path, flags | o_cloexec); if (fd == -1) return uv__err(errno); return fd;#else /* o_cloexec */ int err; int fd; fd = open(path, flags); if (fd == -1) return uv__err(errno); err = uv__cloexec(fd, 1); if (err) { uv__close(fd); return err; } return fd;#endif /* o_cloexec */}
uv__stream_iomain > uv_tcp_init > uv__stream_init > uv__stream_io
双工流的 i/o 观察者回调函数, 如调用的 stream->connect_req 函数, 其值是例子中 uv_listen 函数的最后一个参数 on_new_connection。
当发生 pollin | pollerr | pollhup 事件时: 该 fd 有可读数据时调用 uv__read 函数
当发生 pollout | pollerr | pollhup 事件时: 该 fd 有可读数据时调用 uv__write 函数
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_stream_t* stream; stream = container_of(w, uv_stream_t, io_watcher); assert(stream->type == uv_tcp || stream->type == uv_named_pipe || stream->type == uv_tty); assert(!(stream->flags & uv_handle_closing)); if (stream->connect_req) { uv__stream_connect(stream); return; } assert(uv__stream_fd(stream) >= 0); if (events & (pollin | pollerr | pollhup)) uv__read(stream); if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ if ((events & pollhup) && (stream->flags & uv_handle_reading) && (stream->flags & uv_handle_read_partial) && !(stream->flags & uv_handle_read_eof)) { uv_buf_t buf = { null, 0 }; uv__stream_eof(stream, &buf); } if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ if (events & (pollout | pollerr | pollhup)) { uv__write(stream); uv__write_callbacks(stream); /* write queue drained. */ if (queue_empty(&stream->write_queue)) uv__drain(stream); }}
uv_ip4_addrmain > uv_ip4_addr
uv_ip4_addr 用于将人类可读的 ip 地址、端口对转换为 bsd 套接字 api 所需的 sockaddr_in 结构。
int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) { memset(addr, 0, sizeof(*addr)); addr->sin_family = af_inet; addr->sin_port = htons(port);#ifdef sin6_len addr->sin_len = sizeof(*addr);#endif return uv_inet_pton(af_inet, ip, &(addr->sin_addr.s_addr));}
uv_tcp_bindmain > uv_tcp_bind
从 uv_ip4_addr 函数的实现, 其实是在 addr 的 sin_family 上面设置值为 af_inet, 但在 uv_tcp_bind 函数里面却是从 addr 的 sa_family属性上面取的值, 这让 c 初学者的我又陷入了一阵思考 ...
sockaddr_in 和 sockaddr 是并列的结构,指向 sockaddr_in 的结构体的指针也可以指向 sockaddr 的结构体,并代替它。也就是说,你可以使用 sockaddr_in 建立你所需要的信息,然后用 memset 函数初始化就可以了memset((char*)&mysock,0,sizeof(mysock));//初始化
原来是这样, 这里通过强制指针类型转换 const struct sockaddr* addr 达到的目的, 函数的最后调用了 uv__tcp_bind 函数。
int uv_tcp_bind(uv_tcp_t* handle, const struct sockaddr* addr, unsigned int flags) { unsigned int addrlen; if (handle->type != uv_tcp) return uv_einval; if (addr->sa_family == af_inet) addrlen = sizeof(struct sockaddr_in); else if (addr->sa_family == af_inet6) addrlen = sizeof(struct sockaddr_in6); else return uv_einval; return uv__tcp_bind(handle, addr, addrlen, flags);}
uv__tcp_bindmain > uv_tcp_bind > uv__tcp_bind
调用 maybe_new_socket, 如果当前未设置 socketfd, 则调用 new_socket 获取
调用 setsockopt 用于为指定的套接字设定一个特定的套接字选项
调用 bind 为一个套接字分配地址。当使用socket()创建套接字后,只赋予其所使用的协议,并未分配地址。
int uv__tcp_bind(uv_tcp_t* tcp, const struct sockaddr* addr, unsigned int addrlen, unsigned int flags) { int err; int on; /* cannot set ipv6-only mode on non-ipv6 socket. */ if ((flags & uv_tcp_ipv6only) && addr->sa_family != af_inet6) return uv_einval; err = maybe_new_socket(tcp, addr->sa_family, 0); if (err) return err; on = 1; if (setsockopt(tcp->io_watcher.fd, sol_socket, so_reuseaddr, &on, sizeof(on))) return uv__err(errno);... errno = 0; if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != eaddrinuse) { if (errno == eafnosupport) return uv_einval; return uv__err(errno); }...}
new_socketmain > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket
通过 uv__socket 其本质调用 socket 获取到 sockfd
调用 uv__stream_open 设置 stream i/o 观察的 fd 为步骤1 拿到的 sockfd
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { struct sockaddr_storage saddr; socklen_t slen; int sockfd; int err; err = uv__socket(domain, sock_stream, 0); if (err < 0) return err; sockfd = err; err = uv__stream_open((uv_stream_t*) handle, sockfd, flags); ... return 0;}
uv__stream_openmain > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket > uv__stream_open
主要用于设置 stream->io_watcher.fd 为参数传入的 fd。
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {#if defined(__apple__) int enable;#endif if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) return uv_ebusy; assert(fd >= 0); stream->flags |= flags; if (stream->type == uv_tcp) { if ((stream->flags & uv_handle_tcp_nodelay) && uv__tcp_nodelay(fd, 1)) return uv__err(errno); /* todo use delay the user passed in. */ if ((stream->flags & uv_handle_tcp_keepalive) && uv__tcp_keepalive(fd, 1, 60)) { return uv__err(errno); } }#if defined(__apple__) enable = 1; if (setsockopt(fd, sol_socket, so_oobinline, &enable, sizeof(enable)) && errno != enotsock && errno != einval) { return uv__err(errno); }#endif stream->io_watcher.fd = fd; return 0;}
uv_listenmain > uv_listen
主要调用了 uv_tcp_listen 函数。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int err; err = error_invalid_parameter; switch (stream->type) { case uv_tcp: err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); break; case uv_named_pipe: err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb); break; default: assert(0); } return uv_translate_sys_error(err);}
uv_tcp_listenmain > uv_listen > uv_tcp_listen
调用 listen 开始监听可能的连接请求
挂载例子中传入的回调 on_new_connection
暴力改写 i/o 观察者的回调, 在上面的 uv__stream_init 函数中, 通过 uv__io_init 设置了 i/o 观察者的回调为 uv__stream_io, 作为普通的双工流是适用的, 这里 tcp 流直接通过 tcp->io_watcher.cb = uv__server_io 赋值语句设置 i/o 观察者回调为 uv__server_io
调用 uv__io_start 注册 i/o 观察者, 开始监听工作。
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { ... if (listen(tcp->io_watcher.fd, backlog)) return uv__err(errno); tcp->connection_cb = cb; tcp->flags |= uv_handle_bound; /* start listening for connections. */ tcp->io_watcher.cb = uv__server_io; uv__io_start(tcp->loop, &tcp->io_watcher, pollin); return 0;}
uv__server_iomain > uv_listen > uv_tcp_listen > uv__server_io
tcp 流的 i/o 观察者回调函数
调用 uv__accept, 拿到该连接的 connectfd
此时如果出现了上面 uv__stream_init 时说的 accept (emfile错误), 则调用 uv__emfile_trick 函数
把步骤1拿到的 connectfd 挂载在了 stream->accepted_fd 上面
调用例子中传入的回调 on_new_connection
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { ... while (uv__stream_fd(stream) != -1) { assert(stream->accepted_fd == -1); err = uv__accept(uv__stream_fd(stream)); if (err < 0) { if (err == uv_eagain || err == uv__err(ewouldblock)) return; /* not an error. */ if (err == uv_econnaborted) continue; /* ignore. nothing we can do about that. */ if (err == uv_emfile || err == uv_enfile) { err = uv__emfile_trick(loop, uv__stream_fd(stream)); if (err == uv_eagain || err == uv__err(ewouldblock)) break; } stream->connection_cb(stream, err); continue; } uv_dec_backlog(w) stream->accepted_fd = err; stream->connection_cb(stream, 0); ...}
uv__emfile_trickmain > uv_listen > uv_tcp_listen > uv__server_io > uv__emfile_trick
在上面的 uv__stream_init 函数中, 我们发现 loop 的 emfile_fd 属性上通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符。
当出现 accept (emfile错误)即文件描述符用尽时的错误时
首先将 loop->emfile_fd 文件描述符, 使其能 accept 新连接, 然后我们新连接将其关闭,以使其低于emfile的限制。接下来,我们接受所有等待的连接并关闭它们以向客户发出信号,告诉他们我们已经超载了--我们确实超载了,但是我们仍在继续工作。
static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { int err; int emfile_fd; if (loop->emfile_fd == -1) return uv_emfile; uv__close(loop->emfile_fd); loop->emfile_fd = -1; do { err = uv__accept(accept_fd); if (err >= 0) uv__close(err); } while (err >= 0 || err == uv_eintr); emfile_fd = uv__open_cloexec("/", o_rdonly); if (emfile_fd >= 0) loop->emfile_fd = emfile_fd; return err;}
on_new_connection当收到一个新连接, 例子中的 on_new_connection 函数被调用
通过 uv_tcp_init 初始化了一个 tcp 客户端流
调用 uv_accept 函数
void on_new_connection(uv_stream_t *server, int status) { if (status < 0) { fprintf(stderr, "new connection error %s\n", uv_strerror(status)); // error! return; } uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); uv_tcp_init(loop, client); if (uv_accept(server, (uv_stream_t*) client) == 0) { uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);}
uv_accepton_new_connection > uv_accept
根据不同的协议调用不同的方法, 该例子 tcp 调用 uv__stream_open 方法
uv__stream_open 设置给初始化完成的 client 流设置了 i/o 观察者的 fd。该 fd 即是 uv__server_io 中提到的 connectfd 。
int uv_accept(uv_stream_t* server, uv_stream_t* client) { int err; assert(server->loop == client->loop); if (server->accepted_fd == -1) return uv_eagain; switch (client->type) { case uv_named_pipe: case uv_tcp: err = uv__stream_open(client, server->accepted_fd, uv_handle_readable | uv_handle_writable); if (err) { /* todo handle error */ uv__close(server->accepted_fd); goto done; } break; case uv_udp: err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); if (err) { uv__close(server->accepted_fd); goto done; } break; default: return uv_einval; } client->flags |= uv_handle_bound;done: /* process queued fds */ if (server->queued_fds != null) { uv__stream_queued_fds_t* queued_fds; queued_fds = server->queued_fds; /* read first */ server->accepted_fd = queued_fds->fds[0]; /* all read, free */ assert(queued_fds->offset > 0); if (--queued_fds->offset == 0) { uv__free(queued_fds); server->queued_fds = null; } else { /* shift rest */ memmove(queued_fds->fds, queued_fds->fds + 1, queued_fds->offset * sizeof(*queued_fds->fds)); } } else { server->accepted_fd = -1; if (err == 0) uv__io_start(server->loop, &server->io_watcher, pollin); } return err;}
uv_read_starton_new_connection > uv_read_start
开启一个流的监听工作
挂载回调函数 read_cb 为例子中的 echo_read, 当流有数据写入时被调用
挂载回调函数 alloc_cb 为例子中的 alloc_buffer
调用 uv__io_start 函数, 这可是老朋友了, 通常用在 uv__io_init 初始化 i/o 观察者后面, 用于注册 i/o 观察者。
uv_read_start 主要是调用了 uv__read_start 函数。开始了普通流的 i/o 过程。
初始化 i/o 观察者在 uv_tcp_init > uv_tcp_init_ex > uv__stream_init > uv__io_init 设置其观察者回调函数为 uv__stream_io注册 i/o 观察者为 uv__io_start 开始监听工作。int uv__read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { assert(stream->type == uv_tcp || stream->type == uv_named_pipe || stream->type == uv_tty); /* the uv_handle_reading flag is irrelevant of the state of the tcp - it just * expresses the desired state of the user. */ stream->flags |= uv_handle_reading; /* todo: try to do the read inline? */ /* todo: keep track of tcp state. if we've gotten a eof then we should * not start the io watcher. */ assert(uv__stream_fd(stream) >= 0); assert(alloc_cb); stream->read_cb = read_cb; stream->alloc_cb = alloc_cb; uv__io_start(stream->loop, &stream->io_watcher, pollin); uv__handle_start(stream); uv__stream_osx_interrupt_select(stream); return 0;}
小结uv_tcp_init 初始化 tcp server handle, 其绑定的 fd 为 socket 返回的 socketfd。uv_tcp_bind 调用 bind 为套接字分配一个地址uv_listen 调用 listen 开始监听可能的连接请求uv_accept 调用 accept 去接收一个新连接uv_tcp_init 初始化 tcp client handle, 其绑定的 fd 为 accept 返回的 acceptfd, 剩下的就是一个普通流的读写 i/o 观察。原文地址:https://juejin.cn/post/6982226661081088036
作者:多小凯
更多编程相关知识,请访问:编程视频!!
以上就是聊聊node.js中的网络与流的详细内容。
