首页后端开发其他后端知识Golang中channel的原理是什么?

Golang中channel的原理是什么?

时间2024-03-26 12:32:03发布访客分类其他后端知识浏览1523
导读:这篇文章主要给大家介绍“Golang中channel的原理是什么?”的相关知识,下文通过实际案例向大家展示操作过程,内容简单清晰,易于学习,有这方面学习需要的朋友可以参考,希望这篇“Golang中channel的原理是什么?”文章能对大家有...
这篇文章主要给大家介绍“Golang中channel的原理是什么?”的相关知识,下文通过实际案例向大家展示操作过程,内容简单清晰,易于学习,有这方面学习需要的朋友可以参考,希望这篇“Golang中channel的原理是什么?”文章能对大家有所帮助。




数据结构

channel的数据结构在$GOROOT/src/runtime/chan.go文件下:

type hchan struct {


   qcount   uint           // 当前队列中剩余元素个数

   dataqsiz uint           // 环形队列长度,即可以存放的元素个数

   buf      unsafe.Pointer // 环形队列指针

   elemsize uint16         // 每个元素的大小

   closed   uint32         // 标记是否关闭

   elemtype *_type         // 元素类型

   sendx    uint           // 队列下标,指向元素写入时存放到队列中的位置

   recvx    uint           // 队列下标,指向元素从队列中读出的位置

   recvq    waitq          // 等待读消息的groutine队列

   sendq    waitq          // 等待写消息的groutine队列

   lock     mutex          // 互斥锁

}

chan内部实现了一个环形队列作为缓冲区,队列的长度在创建chan时指定:

等待队列(recvq/sendq)使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog结构:

type waitq struct {

   first *sudog
   last  *sudog
}


type sudog struct {

   g            *g
   next         *sudog
   prev         *sudog
   elem         unsafe.Pointer // data element (may point to stack)
   
   acquiretime  int64
   releasetime  int64
   ticket       uint32
   isSelect     bool
   
   parent       *sudog // semaRoot binary tree
   waitlink     *sudog // g.waiting list or semaRoot
   waittail     *sudog // semaRoot
   c            *hchan // channel
}

创建channel

通常使用make(channel string, 0)的方式创建无缓存的channel,使用make(channel string, 10)创建有缓存的channel。

源码:

func makechan(t *chantype, size int) *hchan {
    
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >
= 116 {

      throw("makechan: invalid channel element type")
   }
    
   if hchanSize%maxAlign != 0 || elem.align >
 maxAlign {

      throw("makechan: bad alignment")
   }
    

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem >
 maxAlloc-hchanSize || size  0 {

      panic(plainError("makechan: size out of range"))
   }

   var c *hchan
   switch {

   
   case mem == 0:
   // 如果当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间;
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
   // 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
   //单独为 runtime.hchan 和缓冲区分配内存;
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }
    

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&
    c.lock, lockRankHchan)
   // 在函数的最后会统一更新elemsize、elemtype 和 dataqsiz 几个字段;

   if debugChan {
    
      print("makechan: chan=", c, ";
     elemsize=", elem.size, ";
 dataqsiz=", size, "\n")
   }

   return c
}

channel读写

  1. 当有新数据来时,首先判断recvq中是否有groutine存在,如果recvq不为空,则说明缓冲区为空,或者没有缓冲区,因为如果缓冲区有数据会被recvq里面的groutine消费。此时从recvq中拿出一个groutine并绑定数据,唤醒该groutine执行任务,这个过程跳过了将数据写入缓冲区的过程。
  2. 如果缓冲区有数据并有空余位置,将数据放入缓冲区。
  3. 如果缓冲区有数据但没有空余位置,当前groutine绑定数据并放入sendx,进入睡眠,等待被唤醒。

源码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    
   .....
   lock(&
c.lock)

   if c.closed != 0 {
    
      unlock(&
c.lock)
      panic(plainError("send on closed channel"))
   }
    

   // 如果Channel 没有被关闭并且已经有处于读等待的 Goroutine,
   // 那么从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据
   if sg := c.recvq.dequeue();
 sg != nil {

      send(c, sg, ep, func() {
     unlock(&
c.lock) }
, 3)
      return true
   }

   
   // 如果recvq为空且缓冲区中还有剩余空间
   if c.qcount  c.dataqsiz {

   // 计算出下一个可以存储数据的位置,
      qp := chanbuf(c, c.sendx)
      // raceenabled: 是否启用数据竞争检测,在编译时指定,默认为false
      if raceenabled {

      // 发出数据竞争警告
         raceacquire(qp)
         racerelease(qp)
      }

      // 将发送的数据拷贝到缓冲区中,产生内存拷贝
      typedmemmove(c.elemtype, qp, ep)
      // 增加 sendx 索引
      c.sendx++
      if c.sendx == c.dataqsiz {

         c.sendx = 0
      }
    
      // 增加计数器
      c.qcount++
      unlock(&
c.lock)
      return true
   }

   
   if !block {
    
      unlock(&
c.lock)
      return false
   }


   // 将channel数据绑定到当前groutine并使groutine休眠
   // 获取发送数据使用的 Goroutine
   gp := getg()
   // 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,
   // 例如发送的 Channel、是否在 select 中和待发送数据的内存地址等
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {

      mysg.releasetime = -1
   }
    
   // 将刚刚创建并初始化的 mysg 加入发送等待队列,并设置到当前 Goroutine的waiting上,
   // 表示 Goroutine 正在等待该sudog准备就绪
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   // 休眠groutine
   gopark(chanparkcommit, unsafe.Pointer(&
c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // 保证传入的数据不被GC
   KeepAlive(ep)

   // someone woke us up.
   if mysg != gp.waiting {

      throw("G waiting list is corrupted")
   }

   gp.waiting = nil
   gp.activeStackChans = false
   if gp.param == nil {

      if c.closed == 0 {

         throw("chansend: spurious wakeup")
      }

      panic(plainError("send on closed channel"))
   }
    
   gp.param = nil
   if mysg.releasetime >
 0 {

      blockevent(mysg.releasetime-t0, 2)
   }

   mysg.c = nil
   releaseSudog(mysg)
   return true
}

  1. 如果sendx不为空且缓冲区不为空,从缓冲区头部读出数据并在当前G执行任务,在sendx中拿出一个G,将其数据写入缓冲区尾部并唤醒该G。
  2. 如果sendx不为空且缓冲区为空,直接从sendx中拿出一个G,将G中数据取出并唤醒该G。
  3. 如果sendx为空且缓冲区不为空,则从缓冲区头部拿出一个数据。
  4. 如果sendx为空且缓冲区为空,将该G放入recvq,进入休眠,等待被唤醒。

源码:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

 // block:这次接收是否阻塞
   if debugChan {

      print("chanrecv: chan=", c, "\n")
   }


   if c == nil {

      if !block {

         return
      }

      // 从一个空 Channel 接收数据时会直接让出处理器的使用权
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
    

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   if !block &
    &
 empty(c) {
    
     // 如果channel为空并且未关闭,直接返回
      if atomic.Load(&
c.closed) == 0 {

         return
      }


      if empty(c) {

         // The channel is irreversibly closed and empty.
         if raceenabled {

            raceacquire(c.raceaddr())
         }

         if ep != nil {

         // 手动标记清楚对象
            typedmemclr(c.elemtype, ep)
         }

         return true, false
      }

   }
    

   var t0 int64
   if blockprofilerate >
 0 {

      t0 = cputicks()
   }
    

   lock(&
    c.lock)
    //如果channel为空,并且已关闭,说明对象不可达
   if c.closed != 0 &
    &
 c.qcount == 0 {

      if raceenabled {

         raceacquire(c.raceaddr())
      }
    
      unlock(&
c.lock)
      if ep != nil {

      // 手动标记清除
         typedmemclr(c.elemtype, ep)
      }

      return true, false
   }
    
    // 如果sendq不为空,直接消费,避免sendq -->
     queue -->
     recvx的过程
   if sg := c.sendq.dequeue();
 sg != nil {

      recv(c, sg, ep, func() {
     unlock(&
c.lock) }
, 3)
      return true, true
   }
    
    
    // 当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 
    // recvx 的索引位置中取出数据进行处理
   if c.qcount >
 0 {

      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {

         raceacquire(qp)
         racerelease(qp)
      }

      // 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove将缓冲区中的数据拷贝到内存中
      if ep != nil {

         typedmemmove(c.elemtype, ep, qp)
      }

      // 使用 runtime.typedmemclr清除队列中的数据并完成收尾工作
      typedmemclr(c.elemtype, qp)
      c.recvx++
      // recvx位置归零
      if c.recvx == c.dataqsiz {

         c.recvx = 0
      }
    
      c.qcount-- // 计数减一
      unlock(&
c.lock) 
      return true, true
   }


   if !block {
    
      unlock(&
c.lock)
      return false, false
   }


   // 当 sendq不为空 并且缓冲区中也不存在任何数据时,阻塞并休眠当前groutine
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {

      mysg.releasetime = -1
   }
    
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)
   gopark(chanparkcommit, unsafe.Pointer(&
c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

   // someone woke us up
   if mysg != gp.waiting {

      throw("G waiting list is corrupted")
   }
    
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime >
 0 {

      blockevent(mysg.releasetime-t0, 2)
   }

   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}
    

以上就是关于“Golang中channel的原理是什么?”的介绍了,感谢各位的阅读,希望文本对大家有所帮助。如果想要了解更多知识,欢迎关注网络,小编每天都会为大家更新不同的知识。

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: Golang中channel的原理是什么?
本文地址: https://pptw.com/jishu/653501.html
Go语言中的并发编程 sync.Once是什么? 用PHP怎样写一个考试时间倒计时功能?

游客 回复需填写必要信息