使用 Golang 实现 worker pool

介绍

Golang 中的 Channel 可以用来处理并发,下面我们就使用 Channel 来实现一个并发异步任务 worker pool。

简单实现

package main

import (
	"fmt"
	"time"

	"github.com/gin-gonic/gin"
)

var mq chan int64

func init() {
	mq = make(chan int64, 10)
}

func produce(i int64) {
	time.Sleep(time.Second)

	mq <- i
}

func consume() {
	for {
		i := <-mq
		time.Sleep(time.Second)
		fmt.Printf("get i %d\n", i)
	}
}

func main() {
	go consume()

	r := gin.Default()
	r.GET("/ping", func(c *gin.Context) {
		go produce(time.Now().UnixMilli())

		c.JSON(200, gin.H{
			"message": "pong",
		})
	})
	r.Run() // listen and serve on 0.0.0.0:8080
}

优化

maxWorkers 是最大并发数,JobQueue 为待执行 job 队列。

package main

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"strconv"
	"time"
)

func jobHandler(w http.ResponseWriter, req *http.Request) {
	id, _ := strconv.Atoi(req.URL.Query().Get("id"))

	// handle async logic
	job := Job{
		Handle: jobRunHandler,
		ID:     id,
	}
	JobQueue <- job

	io.WriteString(w, "do job ok!\n")
}

func jobRunHandler(params HandleParams) {
	time.Sleep(time.Second * 3)
	fmt.Printf("job %d executed \n\n", params.ID)
}

func main() {
	// init worker pool
	Setup()

	http.HandleFunc("/do-job", jobHandler)
	log.Fatal(http.ListenAndServe(":8081", nil))
}

var (
	maxWorkers   = 3 // The number of workers (currency number)
	maxQueueSize = 5 // The size of job queue

	JobQueue chan Job
)

func Setup() {
	// Create the job queue.
	JobQueue = make(chan Job, maxQueueSize)

	// Start the dispatcher.
	dispatcher := NewDispatcher(JobQueue, maxWorkers)
	dispatcher.run()
}

type HandleParams struct {
	ID int
}

// Job holds the attributes needed to perform unit of work.
type Job struct {
	Handle func(HandleParams)
	ID     int
}

// NewWorker creates takes a numeric id and a channel w/ worker pool.
func NewWorker(id int, workerPool chan chan Job) Worker {
	return Worker{
		id:         id,
		jobQueue:   make(chan Job),
		workerPool: workerPool,
		quitChan:   make(chan bool),
	}
}

type Worker struct {
	id         int
	jobQueue   chan Job
	workerPool chan chan Job
	quitChan   chan bool
}

func (w Worker) start() {
	go func() {
		for {
			// Add jobQueue to the worker pool.
			w.workerPool <- w.jobQueue

			select {
			case job := <-w.jobQueue:
				// Dispatcher has added a job to jobQueue.
				log.Printf("worker%d: started job_id %d\n", w.id, job.ID)

				// excute handler
				job.Handle(HandleParams{
					ID: job.ID,
				})

				log.Printf("worker%d: completed job_id %d!\n", w.id, job.ID)
			case <-w.quitChan:
				// We have been asked to stop.
				log.Printf("worker%d stopping\n", w.id)
				return
			}
		}
	}()
}

func (w Worker) stop() {
	go func() {
		w.quitChan <- true
	}()
}

// NewDispatcher creates, and returns a new Dispatcher object.
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
	workerPool := make(chan chan Job, maxWorkers)

	return &Dispatcher{
		jobQueue:   jobQueue,
		maxWorkers: maxWorkers,
		workerPool: workerPool,
	}
}

type Dispatcher struct {
	workerPool chan chan Job
	maxWorkers int
	jobQueue   chan Job
}

func (d *Dispatcher) run() {
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(i+1, d.workerPool)
		worker.start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for job := range d.jobQueue {
		go func(job Job) {
			log.Printf("fetching workerJobQueue for job %d\n", job.ID)
			workerJobQueue := <-d.workerPool
			log.Printf("adding job %d to workerJobQueue\n", job.ID)
			workerJobQueue <- job
		}(job)
	}
}

命令行并发测试

for i in {1..10}; do curl http://127.0.0.1:8081/do-job?id=$i ; done

Ref

comments powered by Disqus