侧边栏壁纸
  • 累计撰写 98 篇文章
  • 累计创建 20 个标签
  • 累计收到 3 条评论
Go

channel 底层原理CSP

林贤钦
2022-04-25 / 0 评论 / 2 点赞 / 216 阅读 / 2,590 字
温馨提示:
本文最后更新于 2022-04-25,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

channel 底层原理CSP

一、CSP是什么

CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。

在文章中,CSP 也是一门自定义的编程语言,作者定义了输入输出语句,用于 processes 间的通信(communicatiton)。processes 被认为是需要输入驱动,并且产生输出,供其他 processes 消费,processes 可以是进程、线程、甚至是代码块。输入命令是:!,用来向 processes 写入;输出是:?,用来从 processes 读出。

Go 一开始就把 CSP 的思想融入到语言的核心里,所以并发编程成为 Go 的一个独特的优势,而且很容易理解。

大多数的编程语言的并发编程模型是基于线程和内存同步访问控制,Go 的并发编程的模型则用 goroutine 和 channel 来替代。Goroutine 和线程类似,channel 和 mutex (用于内存同步访问控制)类似。

Goroutine 解放了程序员,让我们更能贴近业务去思考问题。而不用考虑各种像线程库、线程开销、线程调度等等这些繁琐的底层问题,goroutine 天生替你解决好了。

Channel 则天生就可以和其他 channel 组合。我们可以把收集各种子系统结果的 channel 输入到同一个 channel。Channel 还可以和 select, cancel, timeout 结合起来。而 mutex 就没有这些功能。

Go 的并发原则非常优秀,目标就是简单:尽量使用 channel;把 goroutine 当作免费的资源,随便用。

二、channel 底层的数据结构是什么

底层数据结构需要看源码,版本为 go 1.9.2:

type hchan struct {
    // chan 里元素数量
    qcount   uint
    // chan 底层循环数组的长度
    dataqsiz uint
    // 指向底层循环数组的指针
    // 只针对有缓冲的 channel
    buf      unsafe.Pointer
    // chan 中元素大小
    elemsize uint16
    // chan 是否被关闭的标志
    closed   uint32
    // chan 中元素类型
    elemtype *_type // element type
    // 已发送元素在循环数组中的索引
    sendx    uint   // send index
    // 已接收元素在循环数组中的索引
    recvx    uint   // receive index
    // 等待接收的 goroutine 队列
    recvq    waitq  // list of recv waiters
    // 等待发送的 goroutine 队列
    sendq    waitq  // list of send waiters
    // 保护 hchan 中所有字段
    lock mutex
}

buf 指向底层循环数组,只有缓冲型的 channel 才有。

sendxrecvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。

sendqrecvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。

waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装

type waitq struct {
    first *sudog
    last  *sudog
}

lock 用来保证每个读 channel 或写 channel 的操作都是原子的

例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :

image-20220425220019809

三、channel创建

通道有两个方向,发送和接收。理论上来说,我们可以创建一个只发送或只接收的通道,但是这种通道创建出来后,怎么使用呢?一个只能发的通道,怎么接收呢?同样,一个只能收的通道,如何向其发送数据呢?

一般而言,使用 make 创建一个能收能发的通道:

// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

通过汇编分析,我们知道,最终创建 chan 的函数是 makechan

func makechan(t *chantype, size int64) *hchan

从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。

const (
	maxAlign  = 8
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
	debugChan = false
)
func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // 省略检查越界代码...

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }
	
   var c *hchan
   switch {
   case mem == 0:
      // 队列或者元素大小为0,只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{}) 
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // 进行两次内存分配操作
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   // 循环数组长度
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)
   // 返回 hchan 指针
   return c
}
2

评论区