go 多线程并发 queue demo
生活随笔
收集整理的這篇文章主要介紹了
go 多线程并发 queue demo
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
原文鏈接:Writing worker queues, in Go
1.work.go?
[root@wangjq queue]# cat work.go package mainimport "time"type WorkRequest struct {Name stringDelay time.Duration }2.collector.go
[root@wangjq queue]# cat collector.go package mainimport ("fmt""net/http""time" )// A buffered channel that we can send work requests on. var WorkQueue = make(chan WorkRequest, 100)func Collector(w http.ResponseWriter, r *http.Request) {// Make sure we can only be called with an HTTP POST request.if r.Method != "POST" {w.Header().Set("Allow", "POST")w.WriteHeader(http.StatusMethodNotAllowed)return}// Parse the delay.delay, err := time.ParseDuration(r.FormValue("delay"))if err != nil {http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)return}// Check to make sure the delay is anywhere from 1 to 10 seconds.if delay.Seconds() < 1 || delay.Seconds() > 10 {http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)return}// Now, we retrieve the person's name from the request.name := r.FormValue("name")// Just do a quick bit of sanity checking to make sure the client actually provided us with a name.if name == "" {http.Error(w, "You must specify a name.", http.StatusBadRequest)return}// Now, we take the delay, and the person's name, and make a WorkRequest out of them.work := WorkRequest{Name: name, Delay: delay}// Push the work onto the queue.WorkQueue <- workfmt.Println("Work request queued")// And let the user know their work request was created. w.WriteHeader(http.StatusCreated)return }3.worker.go
[root@wangjq queue]# cat worker.go package mainimport ("fmt""time" )// NewWorker creates, and returns a new Worker object. Its only argument // is a channel that the worker can add itself to whenever it is done its // work. func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {// Create, and return the worker.worker := Worker{ID: id,Work: make(chan WorkRequest),WorkerQueue: workerQueue,QuitChan: make(chan bool)}return worker }type Worker struct {ID intWork chan WorkRequestWorkerQueue chan chan WorkRequestQuitChan chan bool }// This function "starts" the worker by starting a goroutine, that is // an infinite "for-select" loop. func (w *Worker) Start() {go func() {for {// Add ourselves into the worker queue.w.WorkerQueue <- w.Workselect {case work := <-w.Work:// Receive a work request.fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())time.Sleep(work.Delay)fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)case <-w.QuitChan:// We have been asked to stop.fmt.Printf("worker%d stopping\n", w.ID)return}}}() }// Stop tells the worker to stop listening for work requests. // // Note that the worker will only stop *after* it has finished its work. func (w *Worker) Stop() {go func() {w.QuitChan <- true}() }4.dispatcher.go
[root@wangjq queue]# cat dispatcher.go package mainimport "fmt"var WorkerQueue chan chan WorkRequestfunc StartDispatcher(nworkers int) {// First, initialize the channel we are going to but the workers' work channels into.WorkerQueue = make(chan chan WorkRequest, nworkers)// Now, create all of our workers.for i := 0; i < nworkers; i++ {fmt.Println("Starting worker", i+1)worker := NewWorker(i+1, WorkerQueue)worker.Start()}go func() {for {select {case work := <-WorkQueue:fmt.Println("Received work requeust")go func() {worker := <-WorkerQueuefmt.Println("Dispatching work request")worker <- work}()}}}() }5.main.go
[root@wangjq queue]# cat main.go package mainimport ("flag""fmt""net/http" )var (NWorkers = flag.Int("n", 4, "The number of workers to start")HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on") )func main() {// Parse the command-line flags. flag.Parse()// Start the dispatcher.fmt.Println("Starting the dispatcher")StartDispatcher(*NWorkers)// Register our collector as an HTTP handler function.fmt.Println("Registering the collector")http.HandleFunc("/work", Collector)// Start the HTTP server!fmt.Println("HTTP server listening on", *HTTPAddr)if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {fmt.Println(err.Error())} }6.編譯
[root@wangjq queue]# go build -o queued *.go7.運行
[root@wangjq queue]# ./queued -n 5 Starting the dispatcher Starting worker 1 Starting worker 2 Starting worker 3 Starting worker 4 Starting worker 5 Registering the collector HTTP server listening on 127.0.0.1:80008.測試
[root@wangjq ~]# for i in {1..3}; do curl localhost:8000/work -d name=$USER -d delay=$(expr $i % 11)s; done9.效果
[root@wangjq queue]# ./queued -n 5 Starting the dispatcher Starting worker 1 Starting worker 2 Starting worker 3 Starting worker 4 Starting worker 5 Registering the collector HTTP server listening on 127.0.0.1:8000 Work request queued Received work requeust Dispatching work request worker1: Received work request, delaying for 1.000000 seconds Work request queued Received work requeust Dispatching work request worker2: Received work request, delaying for 2.000000 seconds Work request queued Received work requeust Dispatching work request worker4: Received work request, delaying for 3.000000 seconds worker1: Hello, root! worker2: Hello, root! worker4: Hello, root!?
轉載于:https://www.cnblogs.com/wangjq19920210/p/11526946.html
總結
以上是生活随笔為你收集整理的go 多线程并发 queue demo的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: es索引介绍
- 下一篇: go chan 缓存与阻塞