· code

Golang Channel 妙用

Golang 中通过我们使用 Channel 来传递信息、信号,经典的如生产者消费者、退出信号等, 那么除此之外 Channel 还有哪些不常见的用法。

限制并发数

Golang 原生提供了强大的并发原语,但如果无节制的使用大量 Goroutine,并发过大会造成资源浪费,严重时会导致程序崩溃。使用带缓冲区的 Channel 可以解决此类问题。

在 Golang 的godoc/gatevfs中实现了对最大虚拟文件的并发限制。

// New returns a new FileSystem that delegates to fs.
// If gateCh is non-nil and buffered, it's used as a gate
// to limit concurrency on calls to fs.
func New(fs vfs.FileSystem, gateCh chan bool) vfs.FileSystem {
	if cap(gateCh) == 0 {
		return fs
	}
	return gatefs{fs, gate(gateCh)}
}

type gate chan bool

func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }

通过带缓存的 Channel,每次打开文件时调用enter发生数据到 Channel,当文件关闭时调用leave读取 Channel 数据,当前 Channel 满后再次读取变会阻塞,直到有资源被释放,从而达到限制并发数的目的。

func (fs gatefs) Open(p string) (vfs.ReadSeekCloser, error) {
	fs.enter()
	defer fs.leave()
	rsc, err := fs.fs.Open(p)
	if err != nil {
		return nil, err
	}
	return gatef{rsc, fs.gate}, nil
}

可以配合 WaitGroup 来实现最大并发数的控制,具体代码如下:

// control number
type Gocontrol struct {
	ch chan struct{}
	wg sync.WaitGroup
}

func NewGocontrol(number int) *Gocontrol {
	return &Gocontrol{
		ch: make(chan struct{}, number),
	}
}

func (g *Gocontrol) Enter() {
	g.ch <- struct{}{}
}

func (g *Gocontrol) Leave() {
	<-g.ch
}

func (g *Gocontrol) Run(f func()) {
	g.Enter()
	g.wg.Add(1)

	go func() {
		defer g.Leave()
		defer g.wg.Done()
		f()
	}()
}

func (g *Gocontrol) Wait() {
	g.wg.Wait()
}

测试运行,创建 100 个任务,调用Gocontrol限制最大并发为 10,运行runtime.NumGoroutine来获取当前 Goroutine 数

func RunGocontrol() {
	go func() {
		for {
			fmt.Printf("goroutine numbers: %v\n", runtime.NumGoroutine())
			time.Sleep(500 * time.Millisecond)
		}
	}()

	gctl := NewGocontrol(10)

	start := time.Now()
	for i := 0; i < 100; i++ {
		n := i
		gctl.Run(func() {
			time.Sleep(1 * time.Second)
            fmt.Println("hello", n)
		})
	}

	gctl.Wait()

	dur := time.Since(start)
	fmt.Printf("run time: %v", dur)
}

运行结果显示,最大 Goroutine 数为 12(包含 1 个主线程,1 个监控线程,10 个任务线程),符合预期

goroutine numbers: 12
run time: 10.002604769s

实现锁

除了调用 sync 包,使用 Channel 也可以实现锁,以互斥锁为例:

type ChLock chan struct{}

func NewChLock() ChLock {
	return make(chan struct{}, 1)
}

func (l ChLock) Lock() {
	l <- struct{}{}
}

func (l ChLock) Unlock() {
	<-l
}

互斥锁通过容量为 1 的 Channel 实现互斥,同样借助多个 Channel 可以使用读写锁,通过关闭 Channel 可以实现类型 Once 的功能。

从源码层面分析,Channel 其实是一个线程安全的环形队列,Channel 定义在runtime/chan.go中:

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	lock mutex
}

其中包含了lock锁结构,这里的mutex不同于sync.Mutex,只在 Runtime 内部使用是一种低阶的同步原语,也没有提供 Lock/Unlock 方法,只能通过全局的 lock/unlock/initlock 等函数调用。

最后

本文介绍了一些 Channel 的妙用,限制并发数与实现锁等,通过示例及源码阐述其深层次的原因。

Explore more in https://qingwave.github.io