Go中惯用的可变大小工作者池

Go中惯用的可变大小工作者池

问题描述:

I'm trying to implement a pool of workers in Go. The go-wiki (and Effective Go in the Channels section) feature excellent examples of bounding resource use. Simply make a channel with a buffer that's as large as the worker pool. Then fill that channel with workers, and send them back into the channel when they're done. Receiving from the channel blocks until a worker is available. So the channel and a loop is the entire implementation -- very cool!

Alternatively one could block on sending into the channel, but same idea.

My question is about changing the size of the worker pool while it's running. I don't believe there's a way to change the size of a channel. I have some ideas, but most of them seem way too complicated. This page actually implements a semaphore using a channel and empty structs in much the same way, but it has the same problem (these things come up all the time while Googling for "golang semaphore".

我正在尝试在Go中实现一个工人池。 go-wiki (以及“渠道”部分中的有效转到)具有出色的功能 限制资源使用的示例。 只需创建一个与工作池一样大的缓冲区的通道即可。 然后在该频道中填充工作人员,并在完成后将其发送回该频道。 从通道接收,直到有工作人员可用为止。 因此,通道和循环是整个实现-太酷了! p>

或者可以阻止发送到通道,但是还是一样。 p>

我的问题是关于在工作池运行时更改其大小。 我不认为有改变频道大小的方法。 我有一些想法,但大多数想法似乎太复杂了。 此页面实际上使用通道和空结构来实现信号量,其方式大致相同。 相同的问题(在谷歌搜索“ golang信号灯”时,这些事情总是出现。 p> div>

I would do it the other way round. Instead of spawning many goroutines (which still require a considerable amount of memory) and use a channel to block them, I would model the workers as goroutines and use a channel to distribute the work. Something like this:

package main

import (
    "fmt"
    "sync"
)

type Task string

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                return
            }
            fmt.Println("processing task", task)
        case <-quit:
            return
        }
    }
}

func main() {
    tasks := make(chan Task, 128)
    quit := make(chan bool)
    var wg sync.WaitGroup

    // spawn 5 workers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute some tasks
    tasks <- Task("foo")
    tasks <- Task("bar")

    // remove two workers
    quit <- true
    quit <- true

    // add three more workers
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute more tasks
    for i := 0; i < 20; i++ {
        tasks <- Task(fmt.Sprintf("additional_%d", i+1))
    }

    // end of tasks. the workers should quit afterwards
    close(tasks)
    // use "close(quit)", if you do not want to wait for the remaining tasks

    // wait for all workers to shut down properly
    wg.Wait()
}

It might be a good idea to create a separate WorkerPool type with some convenient methods. Also, instead of type Task string it is quite common to use a struct that also contains a done channel that is used to signal that the task had been executed successfully.

Edit: I've played around a bit more and came up with the following: http://play.golang.org/p/VlEirPRk8V. It's basically the same example, with a nicer API.

A simple change that can think is to have a channel that controls how big is the semaphore. The relevant part is the select statements. If there is more work from the queue process it with the current semaphore. If there is a request to change the size of the semaphore change it and continue processing the req queue with the new semaphore. Note that the old one is going to be garbage collected.

package main

import "time"
import "fmt"

type Request struct{ num int }
var quit chan struct{} = make(chan struct{})

func Serve(queue chan *Request, resize chan int, semsize int) {
    for {
        sem := make(chan struct{}, semsize)
        var req *Request
        select {
        case semsize = <-resize:
            {
                sem = make(chan struct{}, semsize)
                fmt.Println("changing semaphore size to ", semsize)
            }
        case req = <-queue:
            {
                sem <- struct{}{}   // Block until there's capacity to process a request.
                go handle(req, sem) // Don't wait for handle to finish.
            }
                case <-quit:
                     return
        }

    }
}

func process(r *Request) {
  fmt.Println("Handled Request", r.num)
}

func handle(r *Request, sem chan struct{}) {
    process(r) // May take a long time & use a lot of memory or CPU
    <-sem      // Done; enable next request to run.
}

func main() {
    workq := make(chan *Request, 1)
    ctrlq := make(chan int)
    go func() {
        for i := 0; i < 20; i += 1 {
            <-time.After(100 * time.Millisecond)
            workq <- &Request{i}
        }
        <-time.After(500 * time.Millisecond)
            quit <- struct{}{}
    }()
    go func() {
        <-time.After(500 * time.Millisecond)
        ctrlq <- 10
    }()
    Serve(workq, ctrlq, 1)
}

http://play.golang.org/p/AHOLlAv2LH