go多协程、协程组以及限流

前言

 因为还没有深入研究过go的协程实现机制,所以这里只是简单表述协程的使用方法。

多协程执行

func main() {
	wg := &sync.WaitGroup{}
	var task = printHello

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			task()
		}()
	}

	wg.Wait()
	fmt.Println("hhh")
}
func printHello() {
	now := time.Now()
	fmt.Printf("Hello,now is: %d-%d-%d
", now.Year(), now.Month(), now.Day())
}

  

  上面是使用go的协程的常用例子,使用go关键字创建新的协程执行任务,使用WaitGroup保证当所有协程都执行完毕的时候才会执行到fmt.Println("hhh")。这种方式有个弊端,由于没有控制协程数,有可能导致协程暴增。

 

协程组  

协程组是使用一组协程执行任务的,可以看作是协程池。协程组规定了最多可以同时执行任务的协程数,避免了直接使用go关键字导致的协程暴增问题。这里先介绍第一种实现,这里要最需要注意的是Start方法中使用的ch变量。

 

package main

import (
	"fmt"
	"sync"
	"time"
)

type Task func()

type Pool struct {
	ConNum int // 并发数
	Task   chan Task // 任务channel
	Wg     *sync.WaitGroup // 同步组
}

func (p *Pool) init(conNum int, wg *sync.WaitGroup) {
	p.ConNum = conNum
	p.Wg = wg
	p.Task = make(chan Task)
}

func (p *Pool) Start() {
	ch := make(chan struct{}, p.ConNum) // 控制并发数
	for task := range p.Task {
		ch <- struct{}{}
		p.Wg.Add(1)
		go func() {
			defer p.Wg.Done()
			task()
			<-ch
		}()
	}
}

// 添加执行任务
func (p *Pool) Execute(task Task) {
	p.Task <- task
}

func main() {
	pool := &Pool{}
	pool.init(10, &sync.WaitGroup{})
	go pool.Start()

	start := time.Now()
	for i := 0; i < 1000; i++ {
		fn := func() {
			time.Sleep(time.Millisecond)
		}
		pool.Execute(fn)
	}
	pool.Wg.Wait()
	fmt.Println("tast cost: ", time.Since(start))

	start = time.Now()
	for i := 0; i < 1000; i++ {
		time.Sleep(time.Millisecond)
	}
	fmt.Println("tast cost: ", time.Since(start))
}

 输出:

tast cost:  11.880283ms
tast cost:  1.261029362s

从上面的耗时可以看到,使用协程组确实缩短了耗时。这里使用了最多10个协程,可以对比发现,与不使用协程相比,约为其1/10,证明协程组确实有效。

 协程组的第二种实现可以理解为常规的“线程池”,其中的协程是复用的。两种实现方式的耗时相当。

package main

import (
	"fmt"
	"sync"
	"time"
)

type Task func()

type Pool struct {
	ConNum int             // 并发数
	Task   chan Task       // 任务channel
	Wg     *sync.WaitGroup // 同步组
}

func (p *Pool) init(conNum int, wg *sync.WaitGroup) {
	p.ConNum = conNum
	p.Wg = wg
	p.Task = make(chan Task)
}

func (p *Pool) Start() {
	ch := make(chan struct{}, p.ConNum) // 控制并发数
	for task := range p.Task {
		ch <- struct{}{}
		p.Wg.Add(1)
		go func() {
			defer p.Wg.Done()
			task()
			<-ch
		}()
	}
}

// 添加执行任务
func (p *Pool) Execute(task Task) {
	p.Task <- task
}

type Pool2 struct {
	ConNum int             // 并发数
	Task   chan Task       // 任务channel
	Wg     *sync.WaitGroup // 同步组
	Mx     *sync.Mutex
}

func (p *Pool2) init(conNum int, wg *sync.WaitGroup) {
	p.ConNum = conNum
	p.Wg = wg
	p.Task = make(chan Task)
	p.Mx = &sync.Mutex{}
}

func (p *Pool2) Start() {
	go func() {
		for i := 0; i < p.ConNum; i++ {
			p.Wg.Add(1)
			go func() {
				defer p.Wg.Done()
				for task := range p.Task {
					task()
				}
			}()
		}
	}()
}

// 添加执行任务
func (p *Pool2) Execute(task Task) {
	p.Mx.Lock()
	defer p.Mx.Unlock()
	if p.Task != nil {
		p.Task <- task
	}
}

func (p *Pool2) Done() {
	p.Mx.Lock()
	defer p.Mx.Unlock()
	close(p.Task)
	p.Task = nil
	p.Wg.Wait()
}

func main() {
	taskNum := 10000000
	conNum := 1000
	pool := &Pool{}
	pool.init(conNum, &sync.WaitGroup{})
	go pool.Start()

	start := time.Now()
	for i := 0; i < taskNum; i++ {
		fn := func() {
			time.Sleep(time.Millisecond)
		}
		pool.Execute(fn)
	}
	pool.Wg.Wait()
	fmt.Println("pool,task cost: ", time.Since(start))

	pool2 := &Pool2{}
	pool2.init(conNum, &sync.WaitGroup{})
	go pool2.Start()

	start = time.Now()
	for i := 0; i < taskNum; i++ {
		fn := func() {
			time.Sleep(time.Millisecond)
		}
		pool2.Execute(fn)
	}
	pool2.Done()
	fmt.Println("pool2,task cost: ", time.Since(start))
}

耗时:

pool,task cost:  10.555079677s
pool2,task cost:  10.513585021s

结合限流器的协程组

协程组的好处不言而喻,但高效并发执行是以资源占用为代价的,为了避免资源占用太多,可以通过go自带的限流器对可以对协程执行进行限流。如下面的代码,在使用限流器限流前,执行一百万个任务的耗时大约是1.1s,可以粗略认为QPS是百万/s,这是一个相当高的量了,所以打算用限流器进行限流,不让QPS那么高。通过设置限流器的limit参数为十万限制了任务Qps最高为10万/s,结果不出所料。

package main

import (
	"context"
	"fmt"
	"golang.org/x/time/rate"
	"sync"
	"time"
)

type Task func()

type Pool struct {
	ConNum  int             // 并发数
	Task    chan Task       // 任务channel
	Wg      *sync.WaitGroup // 同步组
	Limiter *rate.Limiter
	Ctx     context.Context
}

func (p *Pool) init(conNum int, wg *sync.WaitGroup) {
	p.ConNum = conNum
	p.Wg = wg
	p.Task = make(chan Task)
	p.Limiter = rate.NewLimiter(rate.Limit(100000), 10000)
	p.Ctx = context.Background()
}

func (p *Pool) Start() {
	ch := make(chan struct{}, p.ConNum) // 控制并发数
	for task := range p.Task {
		ch <- struct{}{}
		p.Wg.Add(1)
		go func() {
			defer p.Wg.Done()
			p.Limiter.Wait(p.Ctx)
			task()
			<-ch
		}()
	}
}

// 添加执行任务
func (p *Pool) Execute(task Task) {
	p.Task <- task
}

func main() {
	pool := &Pool{}
	pool.init(100, &sync.WaitGroup{})
	go pool.Start()

	start := time.Now()
	for i := 0; i < 1000000; i++ {
		fn := func() {
			time.Sleep(time.Nanosecond * 10)
		}
		pool.Execute(fn)
	}
	pool.Wg.Wait()
	fmt.Println("tast cost: ", time.Since(start))
}

耗时:

tast cost:  9.89831723s