Nanomsg中的usock:一个高效的异步 Socket 封装

Nanomsg中的usock:一个高效的异步 Socket 封装 1、背景Nanomsg 库中的 usock 模块src/core/usock.h 和 usock_posix.c提供了一套优雅的异步 socket 抽象基于状态机和 worker 线程模型让上层协议无需关心底层 I/O 的复杂性。其设计目标是完全异步所有操作connect、accept、send、recv均不阻塞调用线程统一事件接口通过状态机FSM向上层报告事件连接成功、数据到达、错误等高效优先尝试同步非阻塞操作只在必要时才交给 worker 线程等待跨平台在 Windows 和 POSIX 系统上有对应实现2、核心数据结构和函数struct nn_usock 是一个较大的结构体包含了状态机基类、底层 socket fd、worker 线程引用、I/O 缓冲区、任务和事件等成员其定义如下structnn_usock{/* State machine base class. */structnn_fsmfsm;// 状态机基类实现异步事件驱动,用于管理socket的状态和事件intstate;// 当前socket的状态/* The worker thread the usock is associated with. */structnn_worker*worker;// 该socket所属的工作线程/* The underlying OS socket and handle that represents it in the poller. */ints;// 底层操作系统socket描述符structnn_worker_fdwfd;// 该socket在工作线程poller中的句柄是对s的包装/* Members related to receiving data. */struct{/* The buffer being filled in at the moment. */uint8_t*buf;// 当前接收缓冲区size_tlen;// 缓冲区长度/* Buffer for batch-reading inbound data. */uint8_t*batch;// 批量读取数据的缓冲区/* Size of the batch buffer. */size_tbatch_len;// 批量读取缓冲区长度/* Current position in the batch buffer. The data preceding this position were already received by the user. The data that follow will be received in the future. */size_tbatch_pos;// 批量读取缓冲区当前位置/* File descriptor received via SCM_RIGHTS, if any. */int*pfd;// 用于接收带外文件描述符(SCM_RIGHTS) 这是做什么用的}in;/* Members related to sending data. */struct{/* msghdr being sent at the moment. */structmsghdrhdr;// 当前正在发送的消息头/* List of buffers being sent at the moment. Referenced from hdr. */structioveciov[NN_USOCK_MAX_IOVCNT];// 当前正在发送的缓冲区列表}out;/* Asynchronous tasks for the worker. */// nn_worker_task 只是任务描述并不是任务逻辑// 任务描述本身并不包含任务逻辑而是通过状态机和事件驱动机制来执行具体的任务逻辑structnn_worker_tasktask_connecting;// 对应连接任务structnn_worker_tasktask_connected;// 对应连接完成任务structnn_worker_tasktask_accept;// 对应accept任务structnn_worker_tasktask_send;// 对应发送任务structnn_worker_tasktask_recv;// 对应接收任务structnn_worker_tasktask_stop;// 对应停止任务/* Events raised by the usock. */structnn_fsm_eventevent_established;// 连接建立事件structnn_fsm_eventevent_sent;// 发送完成事件structnn_fsm_eventevent_received;// 接收完成事件structnn_fsm_eventevent_error;// 错误事件/* In ACCEPTING state points to the socket being accepted. In BEING_ACCEPTED state points to the listener socket. */// asock 是一个指针在 ACCEPTING 状态下指向被 accept 的 socket// 在 BEING_ACCEPTED 状态下指向监听 socketstructnn_usock*asock;// 指向正在被accept的socket或者指向监听socket/* Errno remembered in NN_USOCK_ERROR state */interrnum;// 最近一次错误码};3、核心API解析3.1、初始化和启动voidnn_usock_init(structnn_usock*self,intsrc,structnn_fsm*owner);intnn_usock_start(structnn_usock*self,intdomain,inttype,intprotocol);voidnn_usock_start_fd(structnn_usock*self,intfd);voidnn_usock_stop(structnn_usock*self);voidnn_usock_async_stop(structnn_usock*self)这里需要关注的是nn_usock_start_fd 函数末尾多调用了 nn_fsm_action(self-fsm, NN_USOCK_ACTION_STARTED)而 nn_usock_start 没有原因在于两者对文件描述符的预期状态不同nn_usock_start创建一个全新的 socket此时它没有任何连接或绑定需要等待上层进一步调用 nn_usock_listen、nn_usock_connect 等才能进入相应状态。因此状态机启动后停留在 STARTING 状态不发送 ACTION_STARTED否则会错误地将其直接转为 ACTIVE虽然nn_usock_start 函数中确实没有直接对 self-state 赋值因为状态机的状态转换是由 nn_fsm_start 触发的内部事件驱动的nn_usock_start_fd接收一个已存在的文件描述符例如通过 accept 得到的已连接 socket或外部传入的已连接套接字。该 fd 已经处于可读写状态无需再经过绑定、监听或连接步骤。通过发送 NN_USOCK_ACTION_STARTED 动作触发状态机从 STARTING 转换到 ACTIVE参见状态机中 STARTING 状态对此动作的处理将 fd 加入 worker 并切换到 ACTIVE 状态从而立即进入数据传输阶段除了这两个API需要注意还需要注意的是nn_usock_stop调用 nn_fsm_stop 启动状态机的正常停止流程。这是一个同步请求但停止过程本身可能是异步的比如等待 worker 清理资源。它不会立即向上层发送 NN_USOCK_SHUTDOWN 事件而是等待状态机完全进入 IDLE 状态后才会通过 nn_fsm_stopped 上报 NN_USOCK_STOPPED。适用于外部主动关闭 socket 的正常路径nn_usock_async_stop是一个内部使用的立即停止函数。它做了两件事1、worker 提交 task_stop 任务异步清理底层 fd 和 poller 注册2、立即通过 nn_fsm_raise 向上层父状态机发送 NN_USOCK_SHUTDOWN 事件封装在 event_error 中。这样上层能马上知道 socket 已经“逻辑上停止”无需等待 worker 实际完成清理。常用于错误处理如连接断开时快速通知上层或状态机内部需要快速关闭并上报的场景3.2、设置选项和地址intnn_usock_setsockopt(structnn_usock*self,intlevel,intoptname,constvoid*optval,size_toptlen);intnn_usock_bind(structnn_usock*self,conststructsockaddr*addr,size_taddrlen);intnn_usock_listen(structnn_usock*self,intbacklog);这些函数只能在 STARTING 或 ACCEPTED 状态下调用保证 socket 尚未进入活跃 I/O3.3、nn_usock_connectvoidnn_usock_connect(structnn_usock*self,conststructsockaddr*addr,size_taddrlen){intrc;/* Notify the state machine that weve started connecting. */// 告诉self中的fsm这个socket正在尝试连接socket的状态会被设置为NN_USOCK_STATE_CONNECTINGnn_fsm_action(self-fsm,NN_USOCK_ACTION_CONNECT);/* Do the connect itself. */rcconnect(self-s,addr,(socklen_t)addrlen);/* Immediate success. */if(nn_fast(rc0)){nn_fsm_action(self-fsm,NN_USOCK_ACTION_DONE);return;}/* Immediate error. */if(nn_slow(errno!EINPROGRESS)){self-errnumerrno;nn_fsm_action(self-fsm,NN_USOCK_ACTION_ERROR);return;}/* Start asynchronous connect. */nn_worker_execute(self-worker,self-task_connecting);}调用非阻塞 connect。若立即成功状态机直接进入 ACTIVE 并上报 CONNECTED 事件这里其实是报给了父状态机若返回 EINPROGRESS则提交 task_connecting 给 worker将 fd 加入 poller 并关注写事件。连接完成后 worker 触发 FD_OUT状态机检查 SO_ERROR 并上报结果。这里其实是通过下面的代码完成设置的caseNN_USOCK_SRC_TASK_CONNECTING:nn_assert(typeNN_WORKER_TASK_EXECUTE);// 设置对写事件OUT的关注。对于非阻塞 connect连接成功或失败时socket 会变得可写或产生错误事件。// worker 通过写就绪事件来通知 usock 状态机从而在 CONNECTING 状态中检查连接结果通过 getsockopt(SO_ERROR) 判断是否成功nn_worker_add_fd(usock-worker,usock-s,usock-wfd);nn_worker_set_out(usock-worker,usock-wfd);3.4、nn_usock_accept// self用于存放新连接的 nn_usock 对象调用前通常处于空闲状态// listener已经处于监听状态LISTENING的 nn_usock 对象voidnn_usock_accept(structnn_usock*self,structnn_usock*listener){ints;/* Start the actual accepting. */// 如果 self 处于空闲状态则启动其状态机并发送 NN_USOCK_ACTION_BEING_ACCEPTED 动作使其进入 BEING_ACCEPTED 状态if(nn_fsm_isidle(self-fsm)){nn_fsm_start(self-fsm);nn_fsm_action(self-fsm,NN_USOCK_ACTION_BEING_ACCEPTED);}// 让监听 socket 从 LISTENING 状态进入 ACCEPTING 状态表示正在等待新的连接nn_fsm_action(listener-fsm,NN_USOCK_ACTION_ACCEPT);/* Try to accept new connection in synchronous manner. */// 使用 accept 或 accept4 尝试立即接受一个连接。监听 socket 已被设为非阻塞模式因此如果没有新连接会返回 EAGAIN/EWOULDBLOCK#ifNN_HAVE_ACCEPT4saccept4(listener-s,NULL,NULL,SOCK_CLOEXEC);if((s0)(errnoENOTSUP)){/* Apparently some old versions of Linux have a stub for this in libc, without any of the underlying kernel support. */saccept(listener-s,NULL,NULL);}#elsesaccept(listener-s,NULL,NULL);#endif/* Immediate success. */// 成功获得新的文件描述符 s 0则走同步成功路径if(nn_fast(s0)){/* Disassociate the listener socket from the accepted socket. Is useful if we restart accepting on ACCEPT_ERROR */// 清除双方的 asock 指针配对关系解除listener-asockNULL;self-asockNULL;nn_usock_init_from_fd(self,s);// 通知监听 socket 接受完成ACTION_DONE使其回到 LISTENING 状态nn_fsm_action(listener-fsm,NN_USOCK_ACTION_DONE);// 通知被接受 socket 完成ACTION_DONE使其从 BEING_ACCEPTED 进入 ACCEPTED 状态nn_fsm_action(self-fsm,NN_USOCK_ACTION_DONE);return;}/* Detect a failure. Note that in ECONNABORTED case we simply ignore the error and wait for next connection in asynchronous manner. */// EAGAIN / EWOULDBLOCK没有待处理的连接最常见。// ECONNABORTED连接被对方提前中止可忽略。// 资源不足错误ENFILE、EMFILE、ENOBUFS、ENOMEM需要特殊处理errno_assert(errnoEAGAIN||errnoEWOULDBLOCK||errnoECONNABORTED||errnoENFILE||errnoEMFILE||errnoENOBUFS||errnoENOMEM);/* Pair the two sockets. They are already paired in case previous attempt failed on ACCEPT_ERROR */nn_assert(!self-asock||self-asocklistener);self-asocklistener;nn_assert(!listener-asock||listener-asockself);listener-asockself;/* Some errors are just ok to ignore for now. We also stop repeating any errors until next IN_FD event so that we are not in a tight loop and allow processing other events in the meantime */// 这里处理的是资源不足错误ENFILE、EMFILE、ENOBUFS、ENOMEMif(nn_slow(errno!EAGAINerrno!EWOULDBLOCKerrno!ECONNABORTEDerrno!listener-errnum)){listener-errnumerrno;listener-stateNN_USOCK_STATE_ACCEPTING_ERROR;nn_fsm_raise(listener-fsm,listener-event_error,NN_USOCK_ACCEPT_ERROR);return;}/* Ask the worker thread to wait for the new connection. */// 这里最终会走到nn_internal_tasks函数设置worker中epoll相关句柄属性等待accept事件唤醒nn_worker_execute(listener-worker,listener-task_accept);}大体的思路是listener 必须处于 LISTENING 状态self 一般是空闲的先尝试同步 accept若成功则直接初始化 self 并通知双方若失败EAGAIN 等则配对两个 socket提交 task_accept 给 worker监听读事件。当新连接到达时worker 触发 FD_IN状态机调用 accept 完成后续工作。3.5、nn_internal_tasks// 处理从 worker 线程反馈到 FSM 的内部任务事件staticintnn_internal_tasks(structnn_usock*usock,intsrc,inttype){/******************************************************************************//* Internal tasks sent from the user thread to the worker thread. *//******************************************************************************/switch(src){caseNN_USOCK_SRC_TASK_SEND:nn_assert(typeNN_WORKER_TASK_EXECUTE);nn_worker_set_out(usock-worker,usock-wfd);return1;caseNN_USOCK_SRC_TASK_RECV:nn_assert(typeNN_WORKER_TASK_EXECUTE);nn_worker_set_in(usock-worker,usock-wfd);return1;caseNN_USOCK_SRC_TASK_CONNECTED:nn_assert(typeNN_WORKER_TASK_EXECUTE);nn_worker_add_fd(usock-worker,usock-s,usock-wfd);return1;caseNN_USOCK_SRC_TASK_CONNECTING:nn_assert(typeNN_WORKER_TASK_EXECUTE);// 设置对写事件OUT的关注。对于非阻塞 connect连接成功或失败时socket 会变得可写或产生错误事件。// worker 通过写就绪事件来通知 usock 状态机从而在 CONNECTING 状态中检查连接结果通过 getsockopt(SO_ERROR) 判断是否成功nn_worker_add_fd(usock-worker,usock-s,usock-wfd);nn_worker_set_out(usock-worker,usock-wfd);return1;caseNN_USOCK_SRC_TASK_ACCEPT:// 它将监听 socket 加入 worker 的监控集合并开启读事件监听。// 之后当新连接到达时worker 通过 FD_IN 事件触发真正的 accept 调用完成整个异步 accept 流程nn_assert(typeNN_WORKER_TASK_EXECUTE);nn_worker_add_fd(usock-worker,usock-s,usock-wfd);nn_worker_set_in(usock-worker,usock-wfd);return1;}return0;}3.6、异步发送和接收voidnn_usock_send(structnn_usock*self,conststructnn_iovec*iov,intiovcnt);voidnn_usock_recv(structnn_usock*self,void*buf,size_tlen,int*fd);发送先调用 sendmsg 尝试发送全部数据。若成功立即上报 SENT若遇到 EAGAIN则提交 task_send等待可写事件后继续发送接收先尝试从批量缓冲区或直接 recvmsg 读取数据。若一次读满立即上报 RECEIVED否则记录剩余位置提交 task_recv等待可读事件后继续填充4、读懂这个模块需要区分事件和任务事件nn_fsm_event由 usock 发出给上层状态机如 NN_USOCK_CONNECTED、NN_USOCK_RECEIVED、NN_USOCK_ERROR。它们通过 nn_fsm_raise 传递任务nn_worker_task由上层调用 API 时提交给 worker用于执行将 fd 加入 poller、设置关注事件等操作。任务执行完成后worker 通过事件通知 usock