在微服务盛行的现在,内部服务通常以长连接为主,而 Go 原生 net 网络库却无法提供足够的性能和控制力,如无法感知连接状态、连接数量多导致利用率低、无法控制协程数量等。为了能够获取对于网络层的完全控制权,同时先于业务做一些探索并最终赋能业务,所以有了 netpoll

原生库,每建立一个连接会创建一个 GoRoutine ,在微服务长连接的模式下,大量长连接沉睡,会给 epoll 带来极大的负担,并且大量 GoRoutine 沉睡,占用了很多资源

通过异步回调 + Routine Pool 的方式。当连接有读写事件发生时,回调事件触发业务逻辑,将回调函数注册进 Routine Pool ,集中算力处理,上层逻辑依然保持着单线程的体验。

同步方式,直接从内核读取数据,硬件拷贝到内核,内核拷贝到内存,两次拷贝

从内核读取数据,硬件拷贝到内核,内核拷贝到内存缓冲区,两次拷贝
异步调用,从缓冲区拷贝数据到业务逻辑一次拷贝

从内核读取数据,硬件拷贝到内核,内核拷贝到内存缓冲区,两次拷贝
异步调用,使用 NoCopy API 拷贝数据到业务逻辑 无拷贝

// Reader is a collection of operations for nocopy reads.
//
// For ease of use, it is recommended to implement Reader as a blocking interface,
// rather than simply fetching the buffer.
// For example, the return of calling Next(n) should be blocked if there are fewer than n bytes, unless timeout.
// The return value is guaranteed to meet the requirements or an error will be returned.
type Reader interface {
   // Next returns a slice containing the next n bytes from the buffer,
   // advancing the buffer as if the bytes had been returned by Read.
   //
   // If there are fewer than n bytes in the buffer, Next returns will be blocked
   // until data enough or an error occurs (such as a wait timeout).
   //
   // The slice p is only valid until the next call to the Release method.
   // Next is not globally optimal, and Skip, ReadString, ReadBinary methods
   // are recommended for specific scenarios.
   //
   // Return: len(p) must be n or 0, and p and error cannot be nil at the same time.
   Next(n int) (p []byte, err error)
   // Peek returns the next n bytes without advancing the reader.
   // Other behavior is the same as Next.
   Peek(n int) (buf []byte, err error)
   // Skip the next n bytes and advance the reader, which is
   // a faster implementation of Next when the next data is not used.
   Skip(n int) (err error)
   // Until reads until the first occurrence of delim in the input,
   // returning a slice stops with delim in the input buffer.
   // If Until encounters an error before finding a delimiter,
   // it returns all the data in the buffer and the error itself (often ErrEOF or ErrConnClosed).
   // Until returns err != nil only if line does not end in delim.
   Until(delim byte) (line []byte, err error)
   // ReadString is a faster implementation of Next when a string needs to be returned.
   // It replaces:
   //
   //  var p, err = Next(n)
   //  return string(p), err
   //
   ReadString(n int) (s string, err error)
   // ReadBinary is a faster implementation of Next when it needs to
   // return a copy of the slice that is not shared with the underlying layer.
   // It replaces:
   //
   //  var p, err = Next(n)
   //  var b = make([]byte, n)
   //  copy(b, p)
   //  return b, err
   //
   ReadBinary(n int) (p []byte, err error)
   // ReadByte is a faster implementation of Next when a byte needs to be returned.
   // It replaces:
   //
   //  var p, err = Next(1)
   //  return p[0], err
   //
   ReadByte() (b byte, err error)
   // Slice returns a new Reader containing the next n bytes from this reader,
   // the operation is zero-copy, similar to b = p [:n].
   Slice(n int) (r Reader, err error)
   // Release the memory space occupied by all read slices. This method needs to be executed actively to
   // recycle the memory after confirming that the previously read data is no longer in use.
   // After invoking Release, the slices obtained by the method such as Next, Peek, Skip will
   // become an invalid address and cannot be used anymore.
   Release() (err error)
   // Len returns the total length of the readable data in the reader.
   Len() (length int)
}
// Writer is a collection of operations for nocopy writes.
//
// The usage of the design is a two-step operation, first apply for a section of memory,
// fill it and then submit. E.g:
//
//  var buf, _ = Malloc(n)
//  buf = append(buf[:0], ...)
//  Flush()
//
// Note that it is not recommended to submit self-managed buffers to Writer.
// Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission,
// it may cause inconsistent life cycle problems. Of course this is not within the scope of the design.
type Writer interface {
   // Malloc returns a slice containing the next n bytes from the buffer,
   // which will be written after submission(e.g. Flush).
   //
   // The slice p is only valid until the next submit(e.g. Flush).
   // Therefore, please make sure that all data has been written into the slice before submission.
   Malloc(n int) (buf []byte, err error)
   // WriteString is a faster implementation of Malloc when a string needs to be written.
   // It replaces:
   //
   //  var buf, err = Malloc(len(s))
   //  n = copy(buf, s)
   //  return n, err
   //
   // The argument string s will be referenced based on the original address and will not be copied,
   // so make sure that the string s will not be changed.
   WriteString(s string) (n int, err error)
   // WriteBinary is a faster implementation of Malloc when a slice needs to be written.
   // It replaces:
   //
   //  var buf, err = Malloc(len(b))
   //  n = copy(buf, b)
   //  return n, err
   //
   // The argument slice b will be referenced based on the original address and will not be copied,
   // so make sure that the slice b will not be changed.
   WriteBinary(b []byte) (n int, err error)
   // WriteByte is a faster implementation of Malloc when a byte needs to be written.
   // It replaces:
   //
   //  var buf, _ = Malloc(1)
   //  buf[0] = b
   //
   WriteByte(b byte) (err error)
   // WriteDirect is used to insert an additional slice of data on the current write stream.
   // For example, if you plan to execute:
   //
   //  var bufA, _ = Malloc(nA)
   //  WriteBinary(b)
   //  var bufB, _ = Malloc(nB)
   //
   // It can be replaced by:
   //
   //  var buf, _ = Malloc(nA+nB)
   //  WriteDirect(b, nB)
   //
   // where buf[:nA] = bufA, buf[nA:nA+nB] = bufB.
   WriteDirect(p []byte, remainCap int) error
   // MallocAck will keep the first n malloc bytes and discard the rest.
   // The following behavior:
   //
   //  var buf, _ = Malloc(8)
   //  buf = buf[:5]
   //  MallocAck(5)
   //
   // equivalent as
   //  var buf, _ = Malloc(5)
   //
   MallocAck(n int) (err error)
   // Append the argument writer to the tail of this writer and set the argument writer to nil,
   // the operation is zero-copy, similar to p = append(p, w.p).
   Append(w Writer) (err error)
   // Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned.
   // Its behavior is equivalent to the io.Writer hat already has parameters(slice b).
   Flush() (err error)
   // MallocLen returns the total length of the writable data that has not yet been submitted in the writer.
   MallocLen() (length int)
}
// LinkBuffer implements ReadWriter.
type LinkBuffer struct {
   length     int32
   mallocSize int
   // 头指针,位于头部,在 Release 时追上 Read 指针,并且 Release 沿途的节点
   head  *linkBufferNode // release head
   // Read 指针,标志正在读的 Node 节点
   read  *linkBufferNode // read head
   // Flush 指针,在 Flush 时追上 Write 指针,Read 指针最多只能读到 Flush 指针的位置
   flush *linkBufferNode // malloc head
   // Write 指针,标志已经写入数据的节点位置
   write *linkBufferNode // malloc tail
   // 缓存 mcache 的节点 Release 的时候归还
   caches [][]byte // buf allocated by Next when cross-package, which should be freed when release
}
// linkBufferNode 节点定义
//
type linkBufferNode struct {
   buf      []byte          // buffer 数据缓冲区
   off      int             // read-offset  读偏移
   malloc   int             // write-offset 写偏移
   refer    int32           // reference count 节点引用计数
   readonly bool            // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false
   origin   *linkBufferNode // the root node of the extends 引用的原始节点
   next     *linkBufferNode // the next node of the linked buffer 当前节点的下一个节点
}

对于 Listener 和 Connection 并不会直接触碰 Poller ,而是通过一个 FDOperator 的代理,将自己的 FD 托管,与 Poller 解耦
对于 Poller 也不会直接感知到上层,只是简单对触发了读写事件的 FD 触发相应的回调,使逻辑更加清真

// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
   // FD is file descriptor, poll will bind when register.
   FD int
   // The FDOperator provides three operations of reading, writing, and hanging.
   // The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
   // 读回调,一般用于 Listener
   OnRead  func(p Poll) error
   // 写回调,用于 Dialer
   OnWrite func(p Poll) error
   // FD 卸载回调
   OnHup   func(p Poll) error
   // The following is the required fn, which must exist when used, or directly panic.
   // Fns are only called by the poll when handles connection events.
   // 读回调,用于 Connection
   Inputs   func(vs [][]byte) (rs [][]byte)
   // 读ACK回调,从内核读取了 N 字节数据以后,ACK 回调 N 字节数据告诉用户进程已经读取成功
   InputAck func(n int) (err error)
   // Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
   // 写回调,用于 Connection
   Outputs   func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
   // 写ACK回调,发送了 N 字节数据给内核以后,ACK 回调 N 字节数据告诉用户进程已经写入成功
   OutputAck func(n int) (err error)
   // poll is the registered location of the file descriptor.
   // 当前 FD 所挂载 
   poll Poll
   // private, used by operatorCache
   // FDOperator 下一个缓存
   next  *FDOperator
   state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
}
// 操作 FDOperator 注册相应的监听
func (op *FDOperator) Control(event PollEvent) error 
// 标记 FDOperator 正在处理 IO 事件
func (op *FDOperator) do() (can bool) 
// 标记 FDOperator 处理 IO 事件结束,重新回到已使用状态
func (op *FDOperator) done() 
// 将 FDOperator 从 未使用状态 置入 已使用状态
func (op *FDOperator) inuse() 
// 将 FDOperator 从 已使用状态 置入 未使用状态
func (op *FDOperator) unused() 
// 判断 FDOperator 状态是否是 未使用状态
func (op *FDOperator) isUnused() bool 
// 重置 FDOperator
func (op *FDOperator) reset() 
// 池化技术
type operatorCache struct {
   locked int32
   first  *FDOperator // 可用的第一个 FDOperator
   cache  []*FDOperator // 这里 cache 的作用是优化 GC
}
func (c *operatorCache) alloc() *FDOperator {
   c.lock()
   if c.first == nil {
      // 批量申请一堆 FDOperator
      const opSize = unsafe.Sizeof(FDOperator{})
      n := block4k / opSize
      if n == 0 {
         n = 1
      }
      // Must be in non-GC memory because can be referenced
      // only from epoll/kqueue internals.
      for i := uintptr(0); i < n; i++ {
         pd := &FDOperator{}
         c.cache = append(c.cache, pd)
         pd.next = c.first
         c.first = pd
      }
   }
   op := c.first
   c.first = op.next
   c.unlock()
   return op
}
当有读事件发生时,由 FDOperator 作为代理,将 字节流 读入到 Connection 的 NoCopy Buffer 中
并且进行已读确认,确认完成后,OnEvent 将会发布任务到 Routine Pool 执行。
对于 Connection 来说,只需要 提供给上层读写接口,管理自己的 Buffer ,以及在读确认时告诉 OnEvent 自己有请求进来了而已,从而维护了自身逻辑的简洁和完整

// onEvent is the collection of event processing.
// OnPrepare, OnRequest, CloseCallback share the lock processing,
// which is a CAS lock and can only be cleared by OnRequest.
type onEvent struct {
   ctx               context.Context
   onConnectCallback atomic.Value
   onRequestCallback atomic.Value
   closeCallbacks    atomic.Value // value is latest *callbackNode
}
// 关闭时回调
type CloseCallback func(connection Connection) error
// 在 Connection 初始化时,对 connection 注入一些参数 并创建一个 context 供 OnConnect 使用
type OnPrepare func(connection Connection) context.Context
// 在 Connection 准备就绪以后,会回调 OnConnection ,它会创建一个 context 供 OnRequest 使用
type OnConnect func(ctx context.Context, connection Connection) context.Context
// 在有读写事件时,会回调 OnRequest ,当 error 时应主动关闭 连接, 返回的 error 不会进行任何处理,同时只会有一个 OnRequest 运行,如果不关闭连接 / 不读完数据,GoRoutine 会进入死循环
type OnRequest func(ctx context.Context, connection Connection) error
每个实例都实现了 Close 方法在触发 Close 时,会触发内部属性的一系列 Close

// Close this server with deadline.
func (s *server) Close(ctx context.Context) error {
   s.operator.Control(PollDetach)
   s.ln.Close()
   var ticker = time.NewTicker(time.Second)
   defer ticker.Stop()
   var hasConn bool
   for {
      hasConn = false
      // 遍历所有连接,尝试关闭,如果关闭失败,则继续轮询关闭,直到所有连接全部安全关闭为止
      s.connections.Range(func(key, value interface{}) bool {
         var conn, ok = value.(gracefulExit)
         if !ok || conn.isIdle() {
            value.(Connection).Close()
         }
         hasConn = true
         return true
      })
      if !hasConn { // all connections have been closed
         return nil
      }
      select {
      case <-ctx.Done():
         return ctx.Err()
      case <-ticker.C:
         continue
      }
   }
}
// onClose means close by user.
func (c *connection) onClose() error {
   // 关闭连接
   if c.closeBy(user) {
      // If Close is called during OnPrepare, poll is not registered.
      if c.operator.poll != nil {
         c.operator.Control(PollDetach)
      }
      // 触发消除读写阻塞
      c.triggerRead()
      c.triggerWrite(ErrConnClosed)
      c.closeCallback(true) // 设置 true 意味着需要和 OnRequest 争抢锁,确保关闭连接时,OnRequest 没有执行
      return nil
   }
   if c.isCloseBy(poller) {
      // Connection with OnRequest of nil
      // relies on the user to actively close the connection to recycle resources.
      c.closeCallback(true)
   }
   return nil
}
// closeCallback .
// It can be confirmed that closeCallback and onRequest will not be executed concurrently.
// If onRequest is still running, it will trigger closeCallback on exit.
func (c *connection) closeCallback(needLock bool) (err error) {
   if needLock && !c.lock(processing) {
      return nil
   }
   var latest = c.closeCallbacks.Load()
   if latest == nil {
      return nil
   }
   for callback := latest.(*callbackNode); callback != nil; callback = callback.pre {
      callback.fn(c)
   }
   return nil
}
// 优雅退出接口,判定连接是否是空闲的,尽量在连接为空时,关闭连接
type gracefulExit interface {
   isIdle() (yes bool)
   Close() (err error)
}
// 回调节点,堆栈形式回调
type callbackNode struct {
   fn  CloseCallback
   pre *callbackNode
}
// CloseCallback will be called when the connection is closed.
// Return: error is unused which will be ignored directly.
type CloseCallback func(connection Connection) error
            
            好好学习,天天向上